1from __future__ import absolute_import 2import os 3import subprocess 4import sys 5 6import lit.Test 7import lit.TestRunner 8import lit.util 9from lit.formats.base import TestFormat 10 11kIsWindows = sys.platform in ["win32", "cygwin"] 12 13 14class GoogleBenchmark(TestFormat): 15 def __init__(self, test_sub_dirs, test_suffix, benchmark_args=[]): 16 self.benchmark_args = list(benchmark_args) 17 self.test_sub_dirs = os.path.normcase(str(test_sub_dirs)).split(";") 18 19 # On Windows, assume tests will also end in '.exe'. 20 exe_suffix = str(test_suffix) 21 if kIsWindows: 22 exe_suffix += ".exe" 23 24 # Also check for .py files for testing purposes. 25 self.test_suffixes = {exe_suffix, test_suffix + ".py"} 26 27 def getBenchmarkTests(self, path, litConfig, localConfig): 28 """getBenchmarkTests(path) - [name] 29 30 Return the tests available in gtest executable. 31 32 Args: 33 path: String path to a gtest executable 34 litConfig: LitConfig instance 35 localConfig: TestingConfig instance""" 36 37 # TODO: allow splitting tests according to the "benchmark family" so 38 # the output for a single family of tests all belongs to the same test 39 # target. 40 list_test_cmd = [path, "--benchmark_list_tests"] 41 try: 42 output = subprocess.check_output(list_test_cmd, env=localConfig.environment) 43 except subprocess.CalledProcessError as exc: 44 litConfig.warning( 45 "unable to discover google-benchmarks in %r: %s. Process output: %s" 46 % (path, sys.exc_info()[1], exc.output) 47 ) 48 raise StopIteration 49 50 nested_tests = [] 51 for ln in output.splitlines(False): # Don't keep newlines. 52 ln = lit.util.to_string(ln) 53 if not ln.strip(): 54 continue 55 56 index = 0 57 while ln[index * 2 : index * 2 + 2] == " ": 58 index += 1 59 while len(nested_tests) > index: 60 nested_tests.pop() 61 62 ln = ln[index * 2 :] 63 if ln.endswith("."): 64 nested_tests.append(ln) 65 elif any([name.startswith("DISABLED_") for name in nested_tests + [ln]]): 66 # Gtest will internally skip these tests. No need to launch a 67 # child process for it. 68 continue 69 else: 70 yield "".join(nested_tests) + ln 71 72 def getTestsInDirectory(self, testSuite, path_in_suite, litConfig, localConfig): 73 source_path = testSuite.getSourcePath(path_in_suite) 74 for subdir in self.test_sub_dirs: 75 dir_path = os.path.join(source_path, subdir) 76 if not os.path.isdir(dir_path): 77 continue 78 for fn in lit.util.listdir_files(dir_path, suffixes=self.test_suffixes): 79 # Discover the tests in this executable. 80 execpath = os.path.join(source_path, subdir, fn) 81 testnames = self.getBenchmarkTests(execpath, litConfig, localConfig) 82 for testname in testnames: 83 testPath = path_in_suite + (subdir, fn, testname) 84 yield lit.Test.Test( 85 testSuite, testPath, localConfig, file_path=execpath 86 ) 87 88 def execute(self, test, litConfig): 89 testPath, testName = os.path.split(test.getSourcePath()) 90 while not os.path.exists(testPath): 91 # Handle GTest parametrized and typed tests, whose name includes 92 # some '/'s. 93 testPath, namePrefix = os.path.split(testPath) 94 testName = namePrefix + "/" + testName 95 96 cmd = [testPath, "--benchmark_filter=%s$" % testName] + self.benchmark_args 97 98 if litConfig.noExecute: 99 return lit.Test.PASS, "" 100 101 try: 102 out, err, exitCode = lit.util.executeCommand( 103 cmd, 104 env=test.config.environment, 105 timeout=litConfig.maxIndividualTestTime, 106 ) 107 except lit.util.ExecuteCommandTimeoutException: 108 return ( 109 lit.Test.TIMEOUT, 110 "Reached timeout of {} seconds".format(litConfig.maxIndividualTestTime), 111 ) 112 113 if exitCode: 114 return lit.Test.FAIL, ("exit code: %d\n" % exitCode) + out + err 115 116 passing_test_line = testName 117 if passing_test_line not in out: 118 msg = "Unable to find %r in google benchmark output:\n\n%s%s" % ( 119 passing_test_line, 120 out, 121 err, 122 ) 123 return lit.Test.UNRESOLVED, msg 124 125 return lit.Test.PASS, err + out 126