1# -*- coding: utf-8 -*- 2# Copyright 2011 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Unit and integration tests for gsutil command_runner module.""" 16 17from __future__ import absolute_import 18 19import logging 20import os 21import time 22 23import gslib 24from gslib import command_runner 25from gslib.command import Command 26from gslib.command_argument import CommandArgument 27from gslib.command_runner import CommandRunner 28from gslib.command_runner import HandleArgCoding 29from gslib.exception import CommandException 30from gslib.tab_complete import CloudObjectCompleter 31from gslib.tab_complete import CloudOrLocalObjectCompleter 32from gslib.tab_complete import LocalObjectCompleter 33from gslib.tab_complete import LocalObjectOrCannedACLCompleter 34from gslib.tab_complete import NoOpCompleter 35import gslib.tests.testcase as testcase 36import gslib.tests.util as util 37from gslib.tests.util import ARGCOMPLETE_AVAILABLE 38from gslib.tests.util import SetBotoConfigFileForTest 39from gslib.tests.util import SetBotoConfigForTest 40from gslib.tests.util import unittest 41from gslib.util import GSUTIL_PUB_TARBALL 42from gslib.util import SECONDS_PER_DAY 43 44 45class FakeArgparseArgument(object): 46 """Fake for argparse parser argument.""" 47 pass 48 49 50class FakeArgparseParser(object): 51 """Fake for argparse parser.""" 52 53 def __init__(self): 54 self.arguments = [] 55 56 def add_argument(self, *unused_args, **unused_kwargs): 57 argument = FakeArgparseArgument() 58 self.arguments.append(argument) 59 return argument 60 61 62class FakeArgparseSubparsers(object): 63 """Container for nested parsers.""" 64 65 def __init__(self): 66 self.parsers = [] 67 68 def add_parser(self, unused_name, **unused_kwargs): 69 parser = FakeArgparseParser() 70 self.parsers.append(parser) 71 return parser 72 73 74class FakeCommandWithInvalidCompleter(Command): 75 """Command with an invalid completer on an argument.""" 76 77 command_spec = Command.CreateCommandSpec( 78 'fake1', 79 argparse_arguments=[ 80 CommandArgument('arg', completer='BAD') 81 ] 82 ) 83 84 help_spec = Command.HelpSpec( 85 help_name='fake1', 86 help_name_aliases=[], 87 help_type='command_help', 88 help_one_line_summary='fake command for tests', 89 help_text='fake command for tests', 90 subcommand_help_text={} 91 ) 92 93 def __init__(self): 94 pass 95 96 97class FakeCommandWithCompleters(Command): 98 """Command with various completer types.""" 99 100 command_spec = Command.CreateCommandSpec( 101 'fake2', 102 argparse_arguments=[ 103 CommandArgument.MakeZeroOrMoreCloudURLsArgument(), 104 CommandArgument.MakeZeroOrMoreFileURLsArgument(), 105 CommandArgument.MakeZeroOrMoreCloudOrFileURLsArgument(), 106 CommandArgument.MakeFreeTextArgument(), 107 CommandArgument.MakeZeroOrMoreCloudBucketURLsArgument(), 108 CommandArgument.MakeFileURLOrCannedACLArgument(), 109 ] 110 ) 111 112 help_spec = Command.HelpSpec( 113 help_name='fake2', 114 help_name_aliases=[], 115 help_type='command_help', 116 help_one_line_summary='fake command for tests', 117 help_text='fake command for tests', 118 subcommand_help_text={} 119 ) 120 121 def __init__(self): 122 pass 123 124 125class TestCommandRunnerUnitTests( 126 testcase.unit_testcase.GsUtilUnitTestCase): 127 """Unit tests for gsutil update check in command_runner module.""" 128 129 def setUp(self): 130 """Sets up the command runner mock objects.""" 131 super(TestCommandRunnerUnitTests, self).setUp() 132 133 # Mock out the timestamp file so we can manipulate it. 134 self.previous_update_file = ( 135 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE) 136 self.timestamp_file = self.CreateTempFile() 137 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = ( 138 self.timestamp_file) 139 140 # Mock out the gsutil version checker. 141 base_version = unicode(gslib.VERSION) 142 while not base_version.isnumeric(): 143 if not base_version: 144 raise CommandException( 145 'Version number (%s) is not numeric.' % gslib.VERSION) 146 base_version = base_version[:-1] 147 command_runner.LookUpGsutilVersion = lambda u, v: float(base_version) + 1 148 149 # Mock out raw_input to trigger yes prompt. 150 command_runner.raw_input = lambda p: 'y' 151 152 # Mock out TTY check to pretend we're on a TTY even if we're not. 153 self.running_interactively = True 154 command_runner.IsRunningInteractively = lambda: self.running_interactively 155 156 # Mock out the modified time of the VERSION file. 157 self.version_mod_time = 0 158 self.previous_version_mod_time = command_runner.GetGsutilVersionModifiedTime 159 command_runner.GetGsutilVersionModifiedTime = lambda: self.version_mod_time 160 161 # Create a fake pub tarball that will be used to check for gsutil version. 162 self.pub_bucket_uri = self.CreateBucket('pub') 163 self.gsutil_tarball_uri = self.CreateObject( 164 bucket_uri=self.pub_bucket_uri, object_name='gsutil.tar.gz', 165 contents='foo') 166 167 def tearDown(self): 168 """Tears down the command runner mock objects.""" 169 super(TestCommandRunnerUnitTests, self).tearDown() 170 171 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = ( 172 self.previous_update_file) 173 command_runner.LookUpGsutilVersion = gslib.util.LookUpGsutilVersion 174 command_runner.raw_input = raw_input 175 176 command_runner.GetGsutilVersionModifiedTime = self.previous_version_mod_time 177 178 command_runner.IsRunningInteractively = gslib.util.IsRunningInteractively 179 180 self.gsutil_tarball_uri.delete_key() 181 self.pub_bucket_uri.delete_bucket() 182 183 def _IsPackageOrCloudSDKInstall(self): 184 # Update should not trigger for package installs or Cloud SDK installs. 185 return (gslib.IS_PACKAGE_INSTALL or 186 os.environ.get('CLOUDSDK_WRAPPER') == '1') 187 188 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') 189 def test_not_interactive(self): 190 """Tests that update is not triggered if not running interactively.""" 191 with SetBotoConfigForTest([ 192 ('GSUtil', 'software_update_check_period', '1')]): 193 with open(self.timestamp_file, 'w') as f: 194 f.write(str(int(time.time() - 2 * SECONDS_PER_DAY))) 195 self.running_interactively = False 196 self.assertEqual( 197 False, 198 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) 199 200 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') 201 def test_no_tracker_file_version_recent(self): 202 """Tests when no timestamp file exists and VERSION file is recent.""" 203 if os.path.exists(self.timestamp_file): 204 os.remove(self.timestamp_file) 205 self.assertFalse(os.path.exists(self.timestamp_file)) 206 self.version_mod_time = time.time() 207 self.assertEqual( 208 False, 209 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) 210 211 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') 212 def test_no_tracker_file_version_old(self): 213 """Tests when no timestamp file exists and VERSION file is old.""" 214 if os.path.exists(self.timestamp_file): 215 os.remove(self.timestamp_file) 216 self.assertFalse(os.path.exists(self.timestamp_file)) 217 self.version_mod_time = 0 218 expected = not self._IsPackageOrCloudSDKInstall() 219 self.assertEqual( 220 expected, 221 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) 222 223 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') 224 def test_invalid_commands(self): 225 """Tests that update is not triggered for certain commands.""" 226 self.assertEqual( 227 False, 228 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('update', 0)) 229 230 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') 231 def test_invalid_file_contents(self): 232 """Tests no update if timestamp file has invalid value.""" 233 with open(self.timestamp_file, 'w') as f: 234 f.write('NaN') 235 self.assertEqual( 236 False, 237 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) 238 239 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') 240 def test_update_should_trigger(self): 241 """Tests update should be triggered if time is up.""" 242 with SetBotoConfigForTest([ 243 ('GSUtil', 'software_update_check_period', '1')]): 244 with open(self.timestamp_file, 'w') as f: 245 f.write(str(int(time.time() - 2 * SECONDS_PER_DAY))) 246 expected = not self._IsPackageOrCloudSDKInstall() 247 self.assertEqual( 248 expected, 249 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) 250 251 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') 252 def test_not_time_for_update_yet(self): 253 """Tests update not triggered if not time yet.""" 254 with SetBotoConfigForTest([ 255 ('GSUtil', 'software_update_check_period', '3')]): 256 with open(self.timestamp_file, 'w') as f: 257 f.write(str(int(time.time() - 2 * SECONDS_PER_DAY))) 258 self.assertEqual( 259 False, 260 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) 261 262 def test_user_says_no_to_update(self): 263 """Tests no update triggered if user says no at the prompt.""" 264 with SetBotoConfigForTest([ 265 ('GSUtil', 'software_update_check_period', '1')]): 266 with open(self.timestamp_file, 'w') as f: 267 f.write(str(int(time.time() - 2 * SECONDS_PER_DAY))) 268 command_runner.raw_input = lambda p: 'n' 269 self.assertEqual( 270 False, 271 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) 272 273 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') 274 def test_update_check_skipped_with_quiet_mode(self): 275 """Tests that update isn't triggered when loglevel is in quiet mode.""" 276 with SetBotoConfigForTest([ 277 ('GSUtil', 'software_update_check_period', '1')]): 278 with open(self.timestamp_file, 'w') as f: 279 f.write(str(int(time.time() - 2 * SECONDS_PER_DAY))) 280 281 expected = not self._IsPackageOrCloudSDKInstall() 282 self.assertEqual( 283 expected, 284 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) 285 286 prev_loglevel = logging.getLogger().getEffectiveLevel() 287 try: 288 logging.getLogger().setLevel(logging.ERROR) 289 # With reduced loglevel, should return False. 290 self.assertEqual( 291 False, 292 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0)) 293 finally: 294 logging.getLogger().setLevel(prev_loglevel) 295 296 def test_command_argument_parser_setup_invalid_completer(self): 297 298 command_map = { 299 FakeCommandWithInvalidCompleter.command_spec.command_name: 300 FakeCommandWithInvalidCompleter() 301 } 302 303 runner = CommandRunner( 304 bucket_storage_uri_class=self.mock_bucket_storage_uri, 305 gsutil_api_class_map_factory=self.mock_gsutil_api_class_map_factory, 306 command_map=command_map) 307 308 subparsers = FakeArgparseSubparsers() 309 try: 310 runner.ConfigureCommandArgumentParsers(subparsers) 311 except RuntimeError as e: 312 self.assertIn('Unknown completer', e.message) 313 314 @unittest.skipUnless(ARGCOMPLETE_AVAILABLE, 315 'Tab completion requires argcomplete') 316 def test_command_argument_parser_setup_completers(self): 317 318 command_map = { 319 FakeCommandWithCompleters.command_spec.command_name: 320 FakeCommandWithCompleters() 321 } 322 323 runner = CommandRunner( 324 bucket_storage_uri_class=self.mock_bucket_storage_uri, 325 gsutil_api_class_map_factory=self.mock_gsutil_api_class_map_factory, 326 command_map=command_map) 327 328 subparsers = FakeArgparseSubparsers() 329 runner.ConfigureCommandArgumentParsers(subparsers) 330 331 self.assertEqual(1, len(subparsers.parsers)) 332 parser = subparsers.parsers[0] 333 self.assertEqual(6, len(parser.arguments)) 334 self.assertEqual(CloudObjectCompleter, type(parser.arguments[0].completer)) 335 self.assertEqual(LocalObjectCompleter, type(parser.arguments[1].completer)) 336 self.assertEqual( 337 CloudOrLocalObjectCompleter, type(parser.arguments[2].completer)) 338 self.assertEqual( 339 NoOpCompleter, type(parser.arguments[3].completer)) 340 self.assertEqual(CloudObjectCompleter, type(parser.arguments[4].completer)) 341 self.assertEqual( 342 LocalObjectOrCannedACLCompleter, type(parser.arguments[5].completer)) 343 344 # pylint: disable=invalid-encoded-data 345 def test_valid_arg_coding(self): 346 """Tests that gsutil encodes valid args correctly.""" 347 # Args other than -h and -p should be utf-8 decoded. 348 args = HandleArgCoding(['ls', '-l']) 349 self.assertIs(type(args[0]), unicode) 350 self.assertIs(type(args[1]), unicode) 351 352 # -p and -h args other than x-goog-meta should not be decoded. 353 args = HandleArgCoding(['ls', '-p', 'abc:def', 'gs://bucket']) 354 self.assertIs(type(args[0]), unicode) 355 self.assertIs(type(args[1]), unicode) 356 self.assertIsNot(type(args[2]), unicode) 357 self.assertIs(type(args[3]), unicode) 358 359 args = HandleArgCoding(['gsutil', '-h', 'content-type:text/plain', 'cp', 360 'a', 'gs://bucket']) 361 self.assertIs(type(args[0]), unicode) 362 self.assertIs(type(args[1]), unicode) 363 self.assertIsNot(type(args[2]), unicode) 364 self.assertIs(type(args[3]), unicode) 365 self.assertIs(type(args[4]), unicode) 366 self.assertIs(type(args[5]), unicode) 367 368 # -h x-goog-meta args should be decoded. 369 args = HandleArgCoding(['gsutil', '-h', 'x-goog-meta-abc', '1234']) 370 self.assertIs(type(args[0]), unicode) 371 self.assertIs(type(args[1]), unicode) 372 self.assertIs(type(args[2]), unicode) 373 self.assertIs(type(args[3]), unicode) 374 375 # -p and -h args with non-ASCII content should raise CommandException. 376 try: 377 HandleArgCoding(['ls', '-p', '碼']) 378 # Ensure exception is raised. 379 self.assertTrue(False) 380 except CommandException as e: 381 self.assertIn('Invalid non-ASCII header', e.reason) 382 try: 383 HandleArgCoding(['-h', '碼', 'ls']) 384 # Ensure exception is raised. 385 self.assertTrue(False) 386 except CommandException as e: 387 self.assertIn('Invalid non-ASCII header', e.reason) 388 389 390class TestCommandRunnerIntegrationTests( 391 testcase.GsUtilIntegrationTestCase): 392 """Integration tests for gsutil update check in command_runner module.""" 393 394 def setUp(self): 395 """Sets up the command runner mock objects.""" 396 super(TestCommandRunnerIntegrationTests, self).setUp() 397 398 # Mock out the timestamp file so we can manipulate it. 399 self.previous_update_file = ( 400 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE) 401 self.timestamp_file = self.CreateTempFile(contents='0') 402 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = ( 403 self.timestamp_file) 404 405 # Mock out raw_input to trigger yes prompt. 406 command_runner.raw_input = lambda p: 'y' 407 408 def tearDown(self): 409 """Tears down the command runner mock objects.""" 410 super(TestCommandRunnerIntegrationTests, self).tearDown() 411 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = ( 412 self.previous_update_file) 413 command_runner.raw_input = raw_input 414 415 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config') 416 def test_lookup_version_without_credentials(self): 417 """Tests that gsutil tarball version lookup works without credentials.""" 418 with SetBotoConfigFileForTest(self.CreateTempFile( 419 contents='[GSUtil]\nsoftware_update_check_period=1')): 420 self.command_runner = command_runner.CommandRunner() 421 # Looking up software version shouldn't get auth failure exception. 422 self.command_runner.RunNamedCommand('ls', [GSUTIL_PUB_TARBALL]) 423