• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16
17import collections
18import os
19import select
20import signal
21import sys
22import tempfile
23import threading
24import time
25import unittest
26import uuid
27
28import six
29from six import moves
30
31from tests import _loader
32from tests import _result
33
34
35class CaptureFile(object):
36    """A context-managed file to redirect output to a byte array.
37
38  Use by invoking `start` (`__enter__`) and at some point invoking `stop`
39  (`__exit__`). At any point after the initial call to `start` call `output` to
40  get the current redirected output. Note that we don't currently use file
41  locking, so calling `output` between calls to `start` and `stop` may muddle
42  the result (you should only be doing this during a Python-handled interrupt as
43  a last ditch effort to provide output to the user).
44
45  Attributes:
46    _redirected_fd (int): File descriptor of file to redirect writes from.
47    _saved_fd (int): A copy of the original value of the redirected file
48      descriptor.
49    _into_file (TemporaryFile or None): File to which writes are redirected.
50      Only non-None when self is started.
51  """
52
53    def __init__(self, fd):
54        self._redirected_fd = fd
55        self._saved_fd = os.dup(self._redirected_fd)
56        self._into_file = None
57
58    def output(self):
59        """Get all output from the redirected-to file if it exists."""
60        if self._into_file:
61            self._into_file.seek(0)
62            return bytes(self._into_file.read())
63        else:
64            return bytes()
65
66    def start(self):
67        """Start redirection of writes to the file descriptor."""
68        self._into_file = tempfile.TemporaryFile()
69        os.dup2(self._into_file.fileno(), self._redirected_fd)
70
71    def stop(self):
72        """Stop redirection of writes to the file descriptor."""
73        # n.b. this dup2 call auto-closes self._redirected_fd
74        os.dup2(self._saved_fd, self._redirected_fd)
75
76    def write_bypass(self, value):
77        """Bypass the redirection and write directly to the original file.
78
79    Arguments:
80      value (str): What to write to the original file.
81    """
82        if six.PY3 and not isinstance(value, six.binary_type):
83            value = value.encode('ascii')
84        if self._saved_fd is None:
85            os.write(self._redirect_fd, value)
86        else:
87            os.write(self._saved_fd, value)
88
89    def __enter__(self):
90        self.start()
91        return self
92
93    def __exit__(self, type, value, traceback):
94        self.stop()
95
96    def close(self):
97        """Close any resources used by self not closed by stop()."""
98        os.close(self._saved_fd)
99
100
101class AugmentedCase(collections.namedtuple('AugmentedCase', ['case', 'id'])):
102    """A test case with a guaranteed unique externally specified identifier.
103
104  Attributes:
105    case (unittest.TestCase): TestCase we're decorating with an additional
106      identifier.
107    id (object): Any identifier that may be considered 'unique' for testing
108      purposes.
109  """
110
111    def __new__(cls, case, id=None):
112        if id is None:
113            id = uuid.uuid4()
114        return super(cls, AugmentedCase).__new__(cls, case, id)
115
116
117# NOTE(lidiz) This complex wrapper is not triggering setUpClass nor
118# tearDownClass. Do not use those methods, or fix this wrapper!
119class Runner(object):
120
121    def __init__(self, dedicated_threads=False):
122        """Constructs the Runner object.
123
124        Args:
125          dedicated_threads: A bool indicates whether to spawn each unit test
126            in separate thread or not.
127        """
128        self._skipped_tests = []
129        self._dedicated_threads = dedicated_threads
130
131    def skip_tests(self, tests):
132        self._skipped_tests = tests
133
134    def run(self, suite):
135        """See setuptools' test_runner setup argument for information."""
136        # only run test cases with id starting with given prefix
137        testcase_filter = os.getenv('GRPC_PYTHON_TESTRUNNER_FILTER')
138        filtered_cases = []
139        for case in _loader.iterate_suite_cases(suite):
140            if not testcase_filter or case.id().startswith(testcase_filter):
141                filtered_cases.append(case)
142
143        # Ensure that every test case has no collision with any other test case in
144        # the augmented results.
145        augmented_cases = [
146            AugmentedCase(case, uuid.uuid4()) for case in filtered_cases
147        ]
148        case_id_by_case = dict((augmented_case.case, augmented_case.id)
149                               for augmented_case in augmented_cases)
150        result_out = moves.cStringIO()
151        result = _result.TerminalResult(
152            result_out, id_map=lambda case: case_id_by_case[case])
153        stdout_pipe = CaptureFile(sys.stdout.fileno())
154        stderr_pipe = CaptureFile(sys.stderr.fileno())
155        kill_flag = [False]
156
157        def sigint_handler(signal_number, frame):
158            if signal_number == signal.SIGINT:
159                kill_flag[0] = True  # Python 2.7 not having 'local'... :-(
160            signal.signal(signal_number, signal.SIG_DFL)
161
162        def fault_handler(signal_number, frame):
163            stdout_pipe.write_bypass(
164                'Received fault signal {}\nstdout:\n{}\n\nstderr:{}\n'.format(
165                    signal_number, stdout_pipe.output(), stderr_pipe.output()))
166            os._exit(1)
167
168        def check_kill_self():
169            if kill_flag[0]:
170                stdout_pipe.write_bypass('Stopping tests short...')
171                result.stopTestRun()
172                stdout_pipe.write_bypass(result_out.getvalue())
173                stdout_pipe.write_bypass('\ninterrupted stdout:\n{}\n'.format(
174                    stdout_pipe.output().decode()))
175                stderr_pipe.write_bypass('\ninterrupted stderr:\n{}\n'.format(
176                    stderr_pipe.output().decode()))
177                os._exit(1)
178
179        def try_set_handler(name, handler):
180            try:
181                signal.signal(getattr(signal, name), handler)
182            except AttributeError:
183                pass
184
185        try_set_handler('SIGINT', sigint_handler)
186        try_set_handler('SIGBUS', fault_handler)
187        try_set_handler('SIGABRT', fault_handler)
188        try_set_handler('SIGFPE', fault_handler)
189        try_set_handler('SIGILL', fault_handler)
190        # Sometimes output will lag after a test has successfully finished; we
191        # ignore such writes to our pipes.
192        try_set_handler('SIGPIPE', signal.SIG_IGN)
193
194        # Run the tests
195        result.startTestRun()
196        for augmented_case in augmented_cases:
197            for skipped_test in self._skipped_tests:
198                if skipped_test in augmented_case.case.id():
199                    break
200            else:
201                sys.stdout.write('Running       {}\n'.format(
202                    augmented_case.case.id()))
203                sys.stdout.flush()
204                if self._dedicated_threads:
205                    # (Deprecated) Spawns dedicated thread for each test case.
206                    case_thread = threading.Thread(
207                        target=augmented_case.case.run, args=(result,))
208                    try:
209                        with stdout_pipe, stderr_pipe:
210                            case_thread.start()
211                            # If the thread is exited unexpected, stop testing.
212                            while case_thread.is_alive():
213                                check_kill_self()
214                                time.sleep(0)
215                            case_thread.join()
216                    except:  # pylint: disable=try-except-raise
217                        # re-raise the exception after forcing the with-block to end
218                        raise
219                    # Records the result of the test case run.
220                    result.set_output(augmented_case.case, stdout_pipe.output(),
221                                      stderr_pipe.output())
222                    sys.stdout.write(result_out.getvalue())
223                    sys.stdout.flush()
224                    result_out.truncate(0)
225                    check_kill_self()
226                else:
227                    # Donates current thread to test case execution.
228                    augmented_case.case.run(result)
229        result.stopTestRun()
230        stdout_pipe.close()
231        stderr_pipe.close()
232
233        # Report results
234        sys.stdout.write(result_out.getvalue())
235        sys.stdout.flush()
236        signal.signal(signal.SIGINT, signal.SIG_DFL)
237        with open('report.xml', 'wb') as report_xml_file:
238            _result.jenkins_junit_xml(result).write(report_xml_file)
239        return result
240