• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2# Copyright 2011 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Implementation of gsutil test command."""
16
17from __future__ import absolute_import
18
19from collections import namedtuple
20import logging
21import os
22import subprocess
23import sys
24import tempfile
25import textwrap
26import time
27
28import gslib
29from gslib.command import Command
30from gslib.command import ResetFailureCount
31from gslib.exception import CommandException
32from gslib.project_id import PopulateProjectId
33import gslib.tests as tests
34from gslib.util import IS_WINDOWS
35from gslib.util import NO_MAX
36
37
38# For Python 2.6, unittest2 is required to run the tests. If it's not available,
39# display an error if the test command is run instead of breaking the whole
40# program.
41# pylint: disable=g-import-not-at-top
42try:
43  from gslib.tests.util import GetTestNames
44  from gslib.tests.util import unittest
45except ImportError as e:
46  if 'unittest2' in str(e):
47    unittest = None
48    GetTestNames = None  # pylint: disable=invalid-name
49  else:
50    raise
51
52
53try:
54  import coverage
55except ImportError:
56  coverage = None
57
58
59_DEFAULT_TEST_PARALLEL_PROCESSES = 5
60_DEFAULT_S3_TEST_PARALLEL_PROCESSES = 50
61_SEQUENTIAL_ISOLATION_FLAG = 'sequential_only'
62
63
64_SYNOPSIS = """
65  gsutil test [-l] [-u] [-f] [command command...]
66"""
67
68_DETAILED_HELP_TEXT = ("""
69<B>SYNOPSIS</B>
70""" + _SYNOPSIS + """
71
72
73<B>DESCRIPTION</B>
74  The gsutil test command runs the gsutil unit tests and integration tests.
75  The unit tests use an in-memory mock storage service implementation, while
76  the integration tests send requests to the production service using the
77  preferred API set in the boto configuration file (see "gsutil help apis" for
78  details).
79
80  To run both the unit tests and integration tests, run the command with no
81  arguments:
82
83    gsutil test
84
85  To run the unit tests only (which run quickly):
86
87    gsutil test -u
88
89  Tests run in parallel regardless of whether the top-level -m flag is
90  present. To limit the number of tests run in parallel to 10 at a time:
91
92    gsutil test -p 10
93
94  To force tests to run sequentially:
95
96    gsutil test -p 1
97
98  To have sequentially-run tests stop running immediately when an error occurs:
99
100    gsutil test -f
101
102  To run tests for one or more individual commands add those commands as
103  arguments. For example, the following command will run the cp and mv command
104  tests:
105
106    gsutil test cp mv
107
108  To list available tests, run the test command with the -l argument:
109
110    gsutil test -l
111
112  The tests are defined in the code under the gslib/tests module. Each test
113  file is of the format test_[name].py where [name] is the test name you can
114  pass to this command. For example, running "gsutil test ls" would run the
115  tests in "gslib/tests/test_ls.py".
116
117  You can also run an individual test class or function name by passing the
118  test module followed by the class name and optionally a test name. For
119  example, to run the an entire test class by name:
120
121    gsutil test naming.GsutilNamingTests
122
123  or an individual test function:
124
125    gsutil test cp.TestCp.test_streaming
126
127  You can list the available tests under a module or class by passing arguments
128  with the -l option. For example, to list all available test functions in the
129  cp module:
130
131    gsutil test -l cp
132
133  To output test coverage:
134
135    gsutil test -c -p 500
136    coverage html
137
138  This will output an HTML report to a directory named 'htmlcov'.
139
140
141<B>OPTIONS</B>
142  -c          Output coverage information.
143
144  -f          Exit on first sequential test failure.
145
146  -l          List available tests.
147
148  -p N        Run at most N tests in parallel. The default value is %d.
149
150  -s          Run tests against S3 instead of GS.
151
152  -u          Only run unit tests.
153""" % _DEFAULT_TEST_PARALLEL_PROCESSES)
154
155
156TestProcessData = namedtuple('TestProcessData',
157                             'name return_code stdout stderr')
158
159
160def MakeCustomTestResultClass(total_tests):
161  """Creates a closure of CustomTestResult.
162
163  Args:
164    total_tests: The total number of tests being run.
165
166  Returns:
167    An instance of CustomTestResult.
168  """
169
170  class CustomTestResult(unittest.TextTestResult):
171    """A subclass of unittest.TextTestResult that prints a progress report."""
172
173    def startTest(self, test):
174      super(CustomTestResult, self).startTest(test)
175      if self.dots:
176        test_id = '.'.join(test.id().split('.')[-2:])
177        message = ('\r%d/%d finished - E[%d] F[%d] s[%d] - %s' % (
178            self.testsRun, total_tests, len(self.errors),
179            len(self.failures), len(self.skipped), test_id))
180        message = message[:73]
181        message = message.ljust(73)
182        self.stream.write('%s - ' % message)
183
184  return CustomTestResult
185
186
187def GetTestNamesFromSuites(test_suite):
188  """Takes a list of test suites and returns a list of contained test names."""
189  suites = [test_suite]
190  test_names = []
191  while suites:
192    suite = suites.pop()
193    for test in suite:
194      if isinstance(test, unittest.TestSuite):
195        suites.append(test)
196      else:
197        test_names.append(test.id()[len('gslib.tests.test_'):])
198  return test_names
199
200
201# pylint: disable=protected-access
202# Need to get into the guts of unittest to evaluate test cases for parallelism.
203def TestCaseToName(test_case):
204  """Converts a python.unittest to its gsutil test-callable name."""
205  return (str(test_case.__class__).split('\'')[1] + '.' +
206          test_case._testMethodName)
207
208
209# pylint: disable=protected-access
210# Need to get into the guts of unittest to evaluate test cases for parallelism.
211def SplitParallelizableTestSuite(test_suite):
212  """Splits a test suite into groups with different running properties.
213
214  Args:
215    test_suite: A python unittest test suite.
216
217  Returns:
218    4-part tuple of lists of test names:
219    (tests that must be run sequentially,
220     tests that must be isolated in a separate process but can be run either
221         sequentially or in parallel,
222     unit tests that can be run in parallel,
223     integration tests that can run in parallel)
224  """
225  # pylint: disable=import-not-at-top
226  # Need to import this after test globals are set so that skip functions work.
227  from gslib.tests.testcase.unit_testcase import GsUtilUnitTestCase
228  isolated_tests = []
229  sequential_tests = []
230  parallelizable_integration_tests = []
231  parallelizable_unit_tests = []
232
233  items_to_evaluate = [test_suite]
234  cases_to_evaluate = []
235  # Expand the test suites into individual test cases:
236  while items_to_evaluate:
237    suite_or_case = items_to_evaluate.pop()
238    if isinstance(suite_or_case, unittest.suite.TestSuite):
239      for item in suite_or_case._tests:
240        items_to_evaluate.append(item)
241    elif isinstance(suite_or_case, unittest.TestCase):
242      cases_to_evaluate.append(suite_or_case)
243
244  for test_case in cases_to_evaluate:
245    test_method = getattr(test_case, test_case._testMethodName, None)
246    if getattr(test_method, 'requires_isolation', False):
247      # Test must be isolated to a separate process, even it if is being
248      # run sequentially.
249      isolated_tests.append(TestCaseToName(test_case))
250    elif not getattr(test_method, 'is_parallelizable', True):
251      sequential_tests.append(TestCaseToName(test_case))
252    elif isinstance(test_case, GsUtilUnitTestCase):
253      parallelizable_unit_tests.append(TestCaseToName(test_case))
254    else:
255      parallelizable_integration_tests.append(TestCaseToName(test_case))
256
257  return (sorted(sequential_tests),
258          sorted(isolated_tests),
259          sorted(parallelizable_unit_tests),
260          sorted(parallelizable_integration_tests))
261
262
263def CountFalseInList(input_list):
264  """Counts number of falses in the input list."""
265  num_false = 0
266  for item in input_list:
267    if not item:
268      num_false += 1
269  return num_false
270
271
272def CreateTestProcesses(parallel_tests, test_index, process_list, process_done,
273                        max_parallel_tests, root_coverage_file=None):
274  """Creates test processes to run tests in parallel.
275
276  Args:
277    parallel_tests: List of all parallel tests.
278    test_index: List index of last created test before this function call.
279    process_list: List of running subprocesses. Created processes are appended
280                  to this list.
281    process_done: List of booleans indicating process completion. One 'False'
282                  will be added per process created.
283    max_parallel_tests: Maximum number of tests to run in parallel.
284    root_coverage_file: The root .coverage filename if coverage is requested.
285
286  Returns:
287    Index of last created test.
288  """
289  orig_test_index = test_index
290  executable_prefix = [sys.executable] if sys.executable and IS_WINDOWS else []
291  s3_argument = ['-s'] if tests.util.RUN_S3_TESTS else []
292
293  process_create_start_time = time.time()
294  last_log_time = process_create_start_time
295  while (CountFalseInList(process_done) < max_parallel_tests and
296         test_index < len(parallel_tests)):
297    env = os.environ.copy()
298    if root_coverage_file:
299      env['GSUTIL_COVERAGE_OUTPUT_FILE'] = root_coverage_file
300    process_list.append(subprocess.Popen(
301        executable_prefix + [gslib.GSUTIL_PATH] +
302        ['-o', 'GSUtil:default_project_id=' + PopulateProjectId()] +
303        ['test'] + s3_argument +
304        ['--' + _SEQUENTIAL_ISOLATION_FLAG] +
305        [parallel_tests[test_index][len('gslib.tests.test_'):]],
306        stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env))
307    test_index += 1
308    process_done.append(False)
309    if time.time() - last_log_time > 5:
310      print ('Created %d new processes (total %d/%d created)' %
311             (test_index - orig_test_index, len(process_list),
312              len(parallel_tests)))
313      last_log_time = time.time()
314  if test_index == len(parallel_tests):
315    print ('Test process creation finished (%d/%d created)' %
316           (len(process_list), len(parallel_tests)))
317  return test_index
318
319
320class TestCommand(Command):
321  """Implementation of gsutil test command."""
322
323  # Command specification. See base class for documentation.
324  command_spec = Command.CreateCommandSpec(
325      'test',
326      command_name_aliases=[],
327      usage_synopsis=_SYNOPSIS,
328      min_args=0,
329      max_args=NO_MAX,
330      supported_sub_args='uflp:sc',
331      file_url_ok=True,
332      provider_url_ok=False,
333      urls_start_arg=0,
334      supported_private_args=[_SEQUENTIAL_ISOLATION_FLAG]
335  )
336  # Help specification. See help_provider.py for documentation.
337  help_spec = Command.HelpSpec(
338      help_name='test',
339      help_name_aliases=[],
340      help_type='command_help',
341      help_one_line_summary='Run gsutil tests',
342      help_text=_DETAILED_HELP_TEXT,
343      subcommand_help_text={},
344  )
345
346  def RunParallelTests(self, parallel_integration_tests,
347                       max_parallel_tests, coverage_filename):
348    """Executes the parallel/isolated portion of the test suite.
349
350    Args:
351      parallel_integration_tests: List of tests to execute.
352      max_parallel_tests: Maximum number of parallel tests to run at once.
353      coverage_filename: If not None, filename for coverage output.
354
355    Returns:
356      (int number of test failures, float elapsed time)
357    """
358    process_list = []
359    process_done = []
360    process_results = []  # Tuples of (name, return code, stdout, stderr)
361    num_parallel_failures = 0
362    # Number of logging cycles we ran with no progress.
363    progress_less_logging_cycles = 0
364    completed_as_of_last_log = 0
365    num_parallel_tests = len(parallel_integration_tests)
366    parallel_start_time = last_log_time = time.time()
367    test_index = CreateTestProcesses(
368        parallel_integration_tests, 0, process_list, process_done,
369        max_parallel_tests, root_coverage_file=coverage_filename)
370    while len(process_results) < num_parallel_tests:
371      for proc_num in xrange(len(process_list)):
372        if process_done[proc_num] or process_list[proc_num].poll() is None:
373          continue
374        process_done[proc_num] = True
375        stdout, stderr = process_list[proc_num].communicate()
376        process_list[proc_num].stdout.close()
377        process_list[proc_num].stderr.close()
378        # TODO: Differentiate test failures from errors.
379        if process_list[proc_num].returncode != 0:
380          num_parallel_failures += 1
381        process_results.append(TestProcessData(
382            name=parallel_integration_tests[proc_num],
383            return_code=process_list[proc_num].returncode,
384            stdout=stdout, stderr=stderr))
385      if len(process_list) < num_parallel_tests:
386        test_index = CreateTestProcesses(
387            parallel_integration_tests, test_index, process_list,
388            process_done, max_parallel_tests,
389            root_coverage_file=coverage_filename)
390      if len(process_results) < num_parallel_tests:
391        if time.time() - last_log_time > 5:
392          print '%d/%d finished - %d failures' % (
393              len(process_results), num_parallel_tests, num_parallel_failures)
394          if len(process_results) == completed_as_of_last_log:
395            progress_less_logging_cycles += 1
396          else:
397            completed_as_of_last_log = len(process_results)
398            # A process completed, so we made progress.
399            progress_less_logging_cycles = 0
400          if progress_less_logging_cycles > 4:
401            # Ran 5 or more logging cycles with no progress, let the user
402            # know which tests are running slowly or hanging.
403            still_running = []
404            for proc_num in xrange(len(process_list)):
405              if not process_done[proc_num]:
406                still_running.append(parallel_integration_tests[proc_num])
407            print 'Still running: %s' % still_running
408            # TODO: Terminate still-running processes if they
409            # hang for a long time.
410          last_log_time = time.time()
411        time.sleep(1)
412    process_run_finish_time = time.time()
413    if num_parallel_failures:
414      for result in process_results:
415        if result.return_code != 0:
416          new_stderr = result.stderr.split('\n')
417          print 'Results for failed test %s:' % result.name
418          for line in new_stderr:
419            print line
420
421    return (num_parallel_failures,
422            (process_run_finish_time - parallel_start_time))
423
424  def PrintTestResults(self, num_sequential_tests, sequential_success,
425                       sequential_time_elapsed,
426                       num_parallel_tests, num_parallel_failures,
427                       parallel_time_elapsed):
428    """Prints test results for parallel and sequential tests."""
429    # TODO: Properly track test skips.
430    print 'Parallel tests complete. Success: %s Fail: %s' % (
431        num_parallel_tests - num_parallel_failures, num_parallel_failures)
432    print (
433        'Ran %d tests in %.3fs (%d sequential in %.3fs, %d parallel in %.3fs)'
434        % (num_parallel_tests + num_sequential_tests,
435           float(sequential_time_elapsed + parallel_time_elapsed),
436           num_sequential_tests,
437           float(sequential_time_elapsed),
438           num_parallel_tests,
439           float(parallel_time_elapsed)))
440    print
441
442    if not num_parallel_failures and sequential_success:
443      print 'OK'
444    else:
445      if num_parallel_failures:
446        print 'FAILED (parallel tests)'
447      if not sequential_success:
448        print 'FAILED (sequential tests)'
449
450  def RunCommand(self):
451    """Command entry point for the test command."""
452    if not unittest:
453      raise CommandException('On Python 2.6, the unittest2 module is required '
454                             'to run the gsutil tests.')
455
456    failfast = False
457    list_tests = False
458    max_parallel_tests = _DEFAULT_TEST_PARALLEL_PROCESSES
459    perform_coverage = False
460    sequential_only = False
461    if self.sub_opts:
462      for o, a in self.sub_opts:
463        if o == '-c':
464          perform_coverage = True
465        elif o == '-f':
466          failfast = True
467        elif o == '-l':
468          list_tests = True
469        elif o == ('--' + _SEQUENTIAL_ISOLATION_FLAG):
470          # Called to isolate a single test in a separate process.
471          # Don't try to isolate it again (would lead to an infinite loop).
472          sequential_only = True
473        elif o == '-p':
474          max_parallel_tests = long(a)
475        elif o == '-s':
476          if not tests.util.HAS_S3_CREDS:
477            raise CommandException('S3 tests require S3 credentials. Please '
478                                   'add appropriate credentials to your .boto '
479                                   'file and re-run.')
480          tests.util.RUN_S3_TESTS = True
481        elif o == '-u':
482          tests.util.RUN_INTEGRATION_TESTS = False
483
484    if perform_coverage and not coverage:
485      raise CommandException(
486          'Coverage has been requested but the coverage module was not found. '
487          'You can install it with "pip install coverage".')
488
489    if (tests.util.RUN_S3_TESTS and
490          max_parallel_tests > _DEFAULT_S3_TEST_PARALLEL_PROCESSES):
491      self.logger.warn(
492          'Reducing parallel tests to %d due to S3 maximum bucket '
493          'limitations.', _DEFAULT_S3_TEST_PARALLEL_PROCESSES)
494      max_parallel_tests = _DEFAULT_S3_TEST_PARALLEL_PROCESSES
495
496    test_names = sorted(GetTestNames())
497    if list_tests and not self.args:
498      print 'Found %d test names:' % len(test_names)
499      print ' ', '\n  '.join(sorted(test_names))
500      return 0
501
502    # Set list of commands to test if supplied.
503    if self.args:
504      commands_to_test = []
505      for name in self.args:
506        if name in test_names or name.split('.')[0] in test_names:
507          commands_to_test.append('gslib.tests.test_%s' % name)
508        else:
509          commands_to_test.append(name)
510    else:
511      commands_to_test = ['gslib.tests.test_%s' % name for name in test_names]
512
513    # Installs a ctrl-c handler that tries to cleanly tear down tests.
514    unittest.installHandler()
515
516    loader = unittest.TestLoader()
517
518    if commands_to_test:
519      try:
520        suite = loader.loadTestsFromNames(commands_to_test)
521      except (ImportError, AttributeError) as e:
522        raise CommandException('Invalid test argument name: %s' % e)
523
524    if list_tests:
525      test_names = GetTestNamesFromSuites(suite)
526      print 'Found %d test names:' % len(test_names)
527      print ' ', '\n  '.join(sorted(test_names))
528      return 0
529
530    if logging.getLogger().getEffectiveLevel() <= logging.INFO:
531      verbosity = 1
532    else:
533      verbosity = 2
534      logging.disable(logging.ERROR)
535
536    if perform_coverage:
537      # We want to run coverage over the gslib module, but filter out the test
538      # modules and any third-party code. We also filter out anything under the
539      # temporary directory. Otherwise, the gsutil update test (which copies
540      # code to the temporary directory) gets included in the output.
541      coverage_controller = coverage.coverage(
542          source=['gslib'], omit=['gslib/third_party/*', 'gslib/tests/*',
543                                  tempfile.gettempdir() + '*'])
544      coverage_controller.erase()
545      coverage_controller.start()
546
547    num_parallel_failures = 0
548    sequential_success = False
549
550    (sequential_tests, isolated_tests,
551     parallel_unit_tests, parallel_integration_tests) = (
552         SplitParallelizableTestSuite(suite))
553
554    # Since parallel integration tests are run in a separate process, they
555    # won't get the override to tests.util, so skip them here.
556    if not tests.util.RUN_INTEGRATION_TESTS:
557      parallel_integration_tests = []
558
559    logging.debug('Sequential tests to run: %s', sequential_tests)
560    logging.debug('Isolated tests to run: %s', isolated_tests)
561    logging.debug('Parallel unit tests to run: %s', parallel_unit_tests)
562    logging.debug('Parallel integration tests to run: %s',
563                  parallel_integration_tests)
564
565    # If we're running an already-isolated test (spawned in isolation by a
566    # previous test process), or we have no parallel tests to run,
567    # just run sequentially. For now, unit tests are always run sequentially.
568    run_tests_sequentially = (sequential_only or
569                              (len(parallel_integration_tests) <= 1
570                               and not isolated_tests))
571
572    if run_tests_sequentially:
573      total_tests = suite.countTestCases()
574      resultclass = MakeCustomTestResultClass(total_tests)
575
576      runner = unittest.TextTestRunner(verbosity=verbosity,
577                                       resultclass=resultclass,
578                                       failfast=failfast)
579      ret = runner.run(suite)
580      sequential_success = ret.wasSuccessful()
581    else:
582      if max_parallel_tests == 1:
583        # We can't take advantage of parallelism, though we may have tests that
584        # need isolation.
585        sequential_tests += parallel_integration_tests
586        parallel_integration_tests = []
587
588      sequential_start_time = time.time()
589      # TODO: For now, run unit tests sequentially because they are fast.
590      # We could potentially shave off several seconds of execution time
591      # by executing them in parallel with the integration tests.
592      if len(sequential_tests) + len(parallel_unit_tests):
593        print 'Running %d tests sequentially.' % (len(sequential_tests) +
594                                                  len(parallel_unit_tests))
595        sequential_tests_to_run = sequential_tests + parallel_unit_tests
596        suite = loader.loadTestsFromNames(
597            sorted([test_name for test_name in sequential_tests_to_run]))
598        num_sequential_tests = suite.countTestCases()
599        resultclass = MakeCustomTestResultClass(num_sequential_tests)
600        runner = unittest.TextTestRunner(verbosity=verbosity,
601                                         resultclass=resultclass,
602                                         failfast=failfast)
603
604        ret = runner.run(suite)
605        sequential_success = ret.wasSuccessful()
606      else:
607        num_sequential_tests = 0
608        sequential_success = True
609      sequential_time_elapsed = time.time() - sequential_start_time
610
611      # At this point, all tests get their own process so just treat the
612      # isolated tests as parallel tests.
613      parallel_integration_tests += isolated_tests
614      num_parallel_tests = len(parallel_integration_tests)
615
616      if not num_parallel_tests:
617        pass
618      else:
619        num_processes = min(max_parallel_tests, num_parallel_tests)
620        if num_parallel_tests > 1 and max_parallel_tests > 1:
621          message = 'Running %d tests in parallel mode (%d processes).'
622          if num_processes > _DEFAULT_TEST_PARALLEL_PROCESSES:
623            message += (
624                ' Please be patient while your CPU is incinerated. '
625                'If your machine becomes unresponsive, consider reducing '
626                'the amount of parallel test processes by running '
627                '\'gsutil test -p <num_processes>\'.')
628          print ('\n'.join(textwrap.wrap(
629              message % (num_parallel_tests, num_processes))))
630        else:
631          print ('Running %d tests sequentially in isolated processes.' %
632                 num_parallel_tests)
633        (num_parallel_failures, parallel_time_elapsed) = self.RunParallelTests(
634            parallel_integration_tests, max_parallel_tests,
635            coverage_controller.data.filename if perform_coverage else None)
636        self.PrintTestResults(
637            num_sequential_tests, sequential_success,
638            sequential_time_elapsed,
639            num_parallel_tests, num_parallel_failures,
640            parallel_time_elapsed)
641
642    if perform_coverage:
643      coverage_controller.stop()
644      coverage_controller.combine()
645      coverage_controller.save()
646      print ('Coverage information was saved to: %s' %
647             coverage_controller.data.filename)
648
649    if sequential_success and not num_parallel_failures:
650      ResetFailureCount()
651      return 0
652    return 1
653