• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 The ANGLE Project Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import contextlib
6import datetime
7import fnmatch
8import json
9import importlib
10import io
11import logging
12import os
13import signal
14import subprocess
15import sys
16import threading
17import time
18
19import android_helper
20import angle_path_util
21
22angle_path_util.AddDepsDirToPath('testing/scripts')
23import common
24if sys.platform.startswith('linux'):
25    # vpython3 can handle this on Windows but not python3
26    import xvfb
27
28
29ANGLE_TRACE_TEST_SUITE = 'angle_trace_tests'
30
31
32def Initialize(suite_name):
33    android_helper.Initialize(suite_name)
34
35
36# Requires .Initialize() to be called first
37def IsAndroid():
38    return android_helper.IsAndroid()
39
40
41class LogFormatter(logging.Formatter):
42
43    def __init__(self):
44        logging.Formatter.__init__(self, fmt='%(levelname).1s%(asctime)s %(message)s')
45
46    def formatTime(self, record, datefmt=None):
47        # Drop date as these scripts are short lived
48        return datetime.datetime.fromtimestamp(record.created).strftime('%H:%M:%S.%fZ')
49
50
51def SetupLogging(level):
52    # Reload to reset if it was already setup by a library
53    importlib.reload(logging)
54
55    logger = logging.getLogger()
56    logger.setLevel(level)
57
58    handler = logging.StreamHandler(sys.stdout)
59    handler.setFormatter(LogFormatter())
60    logger.addHandler(handler)
61
62
63def IsWindows():
64    return sys.platform == 'cygwin' or sys.platform.startswith('win')
65
66
67def ExecutablePathInCurrentDir(binary):
68    if IsWindows():
69        return '.\\%s.exe' % binary
70    else:
71        return './%s' % binary
72
73
74def HasGtestShardsAndIndex(env):
75    if 'GTEST_TOTAL_SHARDS' in env and int(env['GTEST_TOTAL_SHARDS']) != 1:
76        if 'GTEST_SHARD_INDEX' not in env:
77            logging.error('Sharding params must be specified together.')
78            sys.exit(1)
79        return True
80
81    return False
82
83
84def PopGtestShardsAndIndex(env):
85    return int(env.pop('GTEST_TOTAL_SHARDS')), int(env.pop('GTEST_SHARD_INDEX'))
86
87
88# Adapted from testing/test_env.py: also notifies current process and restores original handlers.
89@contextlib.contextmanager
90def forward_signals(procs):
91    assert all(isinstance(p, subprocess.Popen) for p in procs)
92
93    interrupted_event = threading.Event()
94
95    def _sig_handler(sig, _):
96        interrupted_event.set()
97        for p in procs:
98            if p.poll() is not None:
99                continue
100            # SIGBREAK is defined only for win32.
101            # pylint: disable=no-member
102            if sys.platform == 'win32' and sig == signal.SIGBREAK:
103                p.send_signal(signal.CTRL_BREAK_EVENT)
104            else:
105                print("Forwarding signal(%d) to process %d" % (sig, p.pid))
106                p.send_signal(sig)
107            # pylint: enable=no-member
108
109    if sys.platform == 'win32':
110        signals = [signal.SIGBREAK]  # pylint: disable=no-member
111    else:
112        signals = [signal.SIGINT, signal.SIGTERM]
113
114    original_handlers = {}
115    for sig in signals:
116        original_handlers[sig] = signal.signal(sig, _sig_handler)
117
118    yield
119
120    for sig, handler in original_handlers.items():
121        signal.signal(sig, handler)
122
123    if interrupted_event.is_set():
124        raise KeyboardInterrupt()
125
126
127# From testing/test_env.py, see run_command_with_output below
128def _popen(*args, **kwargs):
129    assert 'creationflags' not in kwargs
130    if sys.platform == 'win32':
131        # Necessary for signal handling. See crbug.com/733612#c6.
132        kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
133    return subprocess.Popen(*args, **kwargs)
134
135
136# Forked from testing/test_env.py to add ability to suppress logging with log=False
137def run_command_with_output(argv, stdoutfile, env=None, cwd=None, log=True):
138    assert stdoutfile
139    with io.open(stdoutfile, 'wb') as writer, \
140          io.open(stdoutfile, 'rb') as reader:
141        process = _popen(argv, env=env, cwd=cwd, stdout=writer, stderr=subprocess.STDOUT)
142        with forward_signals([process]):
143            while process.poll() is None:
144                if log:
145                    sys.stdout.write(reader.read().decode('utf-8'))
146                # This sleep is needed for signal propagation. See the
147                # wait_with_signals() docstring.
148                time.sleep(0.1)
149            if log:
150                sys.stdout.write(reader.read().decode('utf-8'))
151            return process.returncode
152
153
154def RunTestSuite(test_suite,
155                 cmd_args,
156                 env,
157                 show_test_stdout=True,
158                 use_xvfb=False):
159    if android_helper.IsAndroid():
160        result, output, json_results = android_helper.RunTests(
161            test_suite, cmd_args, log_output=show_test_stdout)
162        return result, output, json_results
163
164    cmd = ExecutablePathInCurrentDir(test_suite) if os.path.exists(
165        os.path.basename(test_suite)) else test_suite
166    runner_cmd = [cmd] + cmd_args
167
168    logging.debug(' '.join(runner_cmd))
169    with contextlib.ExitStack() as stack:
170        stdout_path = stack.enter_context(common.temporary_file())
171
172        flag_matches = [a for a in cmd_args if a.startswith('--isolated-script-test-output=')]
173        if flag_matches:
174            results_path = flag_matches[0].split('=')[1]
175        else:
176            results_path = stack.enter_context(common.temporary_file())
177            runner_cmd += ['--isolated-script-test-output=%s' % results_path]
178
179        if use_xvfb:
180            xvfb_whd = '3120x3120x24'  # Max screen dimensions from traces, as per:
181            # % egrep 'Width|Height' src/tests/restricted_traces/*/*.json | awk '{print $3 $2}' | sort -n
182            exit_code = xvfb.run_executable(
183                runner_cmd, env, stdoutfile=stdout_path, xvfb_whd=xvfb_whd)
184        else:
185            exit_code = run_command_with_output(
186                runner_cmd, env=env, stdoutfile=stdout_path, log=show_test_stdout)
187        with open(stdout_path) as f:
188            output = f.read()
189        with open(results_path) as f:
190            data = f.read()
191            json_results = json.loads(data) if data else None  # --list-tests => empty file
192
193    return exit_code, output, json_results
194
195
196def GetTestsFromOutput(output):
197    out_lines = output.split('\n')
198    start = out_lines.index('Tests list:')
199    end = out_lines.index('End tests list.')
200    return out_lines[start + 1:end]
201
202
203def FilterTests(tests, test_filter):
204    matches = set()
205    for single_filter in test_filter.split(':'):
206        matches.update(fnmatch.filter(tests, single_filter))
207    return sorted(matches)
208