#! /usr/bin/python #/****************************************************************** #// #// OpenCL Conformance Tests #// #// Copyright: (c) 2008-2009 by Apple Inc. All Rights Reserved. #// #******************************************************************/ import os, re, sys, subprocess, time, commands, tempfile, math, string DEBUG = 0 log_file_name = "opencl_conformance_results_" + time.strftime("%Y-%m-%d_%H-%M", time.localtime())+ ".log" process_pid = 0 # The amount of time between printing a "." (if no output from test) or ":" (if output) # to the screen while the tests are running. seconds_between_status_updates = 60*60*24*7 # effectively never # Help info def write_help_info() : print("run_conformance.py test_list [CL_DEVICE_TYPE(s) to test] [partial-test-names, ...] [log=path/to/log/file/]") print(" test_list - the .csv file containing the test names and commands to run the tests.") print(" [partial-test-names, ...] - optional partial strings to select a subset of the tests to run.") print(" [CL_DEVICE_TYPE(s) to test] - list of CL device types to test, default is CL_DEVICE_TYPE_DEFAULT.") print(" [log=path/to/log/file/] - provide a path for the test log file, default is in the current directory.") print(" (Note: spaces are not allowed in the log file path.") # Get the time formatted nicely def get_time() : return time.strftime("%d-%b %H:%M:%S", time.localtime()) # Write text to the screen and the log file def write_screen_log(text) : global log_file print(text) log_file.write(text+"\n") # Load the tests from a csv formated file of the form name,command def get_tests(filename, devices_to_test): tests = [] if (os.path.exists(filename) == False): print("FAILED: test_list \"" + filename + "\" does not exist.") print("") write_help_info() sys.exit(-1) file = open(filename, 'r') for line in file.readlines(): comment = re.search("^#.*", line) if (comment): continue device_specific_match = re.search("^\s*(.+?)\s*,\s*(.+?)\s*,\s*(.+?)\s*$", line) if (device_specific_match): if (device_specific_match.group(1) in devices_to_test): test_path = string.replace(device_specific_match.group(3), '/', os.sep) test_name = string.replace(device_specific_match.group(2), '/', os.sep) tests.append((test_name, test_path)) else: print("Skipping " + device_specific_match.group(2) + " because " + device_specific_match.group(1) + " is not in the list of devices to test.") continue match = re.search("^\s*(.+?)\s*,\s*(.+?)\s*$", line) if (match): test_path = string.replace(match.group(2), '/', os.sep) test_name = string.replace(match.group(1), '/', os.sep) tests.append((test_name, test_path)) return tests def run_test_checking_output(current_directory, test_dir, log_file): global process_pid, seconds_between_status_updates failures_this_run = 0 start_time = time.time() # Create a temporary file for capturing the output from the test (output_fd, output_name) = tempfile.mkstemp() if ( not os.path.exists(output_name)) : write_screen_log("\n ==> ERROR: could not create temporary file %s ." % output_name) os.close(output_fd) return -1 # Execute the test program_to_run = test_dir_without_args = test_dir.split(None, 1)[0] if ( os.sep == '\\' ) : program_to_run += ".exe" if (os.path.exists(current_directory + os.sep + program_to_run)) : os.chdir(os.path.dirname(current_directory+os.sep+test_dir_without_args) ) try: if (DEBUG): p = subprocess.Popen("", stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True) else : p = subprocess.Popen(current_directory + os.sep + test_dir, stderr=output_fd, stdout=output_fd, shell=True) except OSError: write_screen_log("\n ==> ERROR: failed to execute test. Failing test. : " + str(OSError)) os.close(output_fd) return -1 else: write_screen_log("\n ==> ERROR: test file (" + current_directory + os.sep + program_to_run +") does not exist. Failing test.") os.close(output_fd) return -1 # Set the global pid so we can kill it if this is aborted process_pid = p.pid # Read one character at a time from the temporary output file while the process is running. # When we get an end-of-line, look for errors and write the results to the log file. # This allows us to process the file as it is being produced. # Keep track of the state for reading # Whether we are done, if we have more to read, and where in the file we last read done = False more_to_read = True pointer = 0 pointer_at_last_user_update = 0 output_this_run = False try: read_output = open(output_name, 'r') except IOError: write_screen_log("\n ==> ERROR: could not open output file from test.") os.close(output_fd) return -1 line = "" while (not done or more_to_read): os.fsync(output_fd) # Determine if we should display some output elapsed_time = (time.time() - start_time) if (elapsed_time > seconds_between_status_updates): start_time = time.time() # If we've received output from the test since the last update, display a # if (pointer != pointer_at_last_user_update): sys.stdout.write(":") else: sys.stdout.write(".") pointer_at_last_user_update = pointer sys.stdout.flush() # Check if we're done p.poll() if (not done and p.returncode != None): if (p.returncode < 0): if (not output_this_run): print "" output_this_run = True write_screen_log(" ==> ERROR: test killed/crashed: " + str(p.returncode)+ ".") done = True # Try reading try: read_output.seek(pointer) char_read = read_output.read(1) except IOError: time.sleep(1) continue # If we got a full line then process it if (char_read == "\n"): # Look for failures and report them as such match = re.search(".*(FAILED|ERROR).*", line) if (match): if (not output_this_run): print "" output_this_run = True print(" ==> " + line.replace('\n','')) match = re.search(".*FAILED.*", line) if (match): failures_this_run = failures_this_run + 1 match = re.search(".*(PASSED).*", line) if (match): if (not output_this_run): print "" output_this_run = True print(" " + line.replace('\n','')) # Write it to the log log_file.write(" " + line +"\n") log_file.flush() line = "" pointer = pointer + 1 # If we are at the end of the file, then re-open it to get new data elif (char_read == ""): more_to_read = False read_output.close() time.sleep(1) try: os.fsync(output_fd) read_output = open(output_name, 'r') # See if there is more to read. This happens if the process ends and we have data left. read_output.seek(pointer) if (read_output.read(1) != ""): more_to_read = True except IOError: write_screen_log("\n ==> ERROR: could not reopen output file from test.") return -1 done = True else: line = line + char_read pointer = pointer + 1 # Now we are done, so write out any remaining data in the file: # This should only happen if the process exited with an error. os.fsync(output_fd) while (read_output.read(1) != ""): log_file.write(read_output.read(1)) # Return the total number of failures if (p.returncode == 0 and failures_this_run > 0): write_screen_log("\n ==> ERROR: Test returned 0, but number of FAILED lines reported is " + str(failures_this_run) +".") return failures_this_run return p.returncode def run_tests(tests) : global curent_directory global process_pid # Run the tests failures = 0 previous_test = None test_number = 1 for test in tests: # Print the name of the test we're running and the time (test_name, test_dir) = test if (test_dir != previous_test): print("========== " + test_dir) log_file.write("========================================================================================\n") log_file.write("========================================================================================\n") log_file.write("(" + get_time() + ") Running Tests: " + test_dir +"\n") log_file.write("========================================================================================\n") log_file.write("========================================================================================\n") previous_test = test_dir print("("+get_time()+") BEGIN " + test_name.ljust(40) +": "), log_file.write(" ----------------------------------------------------------------------------------------\n") log_file.write(" (" + get_time() + ") Running Sub Test: " + test_name + "\n") log_file.write(" ----------------------------------------------------------------------------------------\n") log_file.flush() sys.stdout.flush() # Run the test result = 0 start_time = time.time() try: process_pid = 0 result = run_test_checking_output(current_directory, test_dir, log_file) except KeyboardInterrupt: # Catch an interrupt from the user write_screen_log("\nFAILED: Execution interrupted. Killing test process, but not aborting full test run.") os.kill(process_pid, 9) answer = raw_input("Abort all tests? (y/n)") if (answer.find("y") != -1): write_screen_log("\nUser chose to abort all tests.") log_file.close() sys.exit(-1) else: write_screen_log("\nUser chose to continue with other tests. Reporting this test as failed.") result = 1 run_time = (time.time() - start_time) # Move print the finish status if (result == 0): print("("+get_time()+") PASSED " + test_name.ljust(40) +": (" + str(int(run_time)).rjust(3) + "s, test " + str(test_number).rjust(3) + os.sep + str(len(tests)) +")"), else: print("("+get_time()+") FAILED " + test_name.ljust(40) +": (" + str(int(run_time)).rjust(3) + "s, test " + str(test_number).rjust(3) + os.sep + str(len(tests)) +")"), test_number = test_number + 1 log_file.write(" ----------------------------------------------------------------------------------------\n") log_file.flush() print("") if (result != 0): log_file.write(" *******************************************************************************************\n") log_file.write(" * ("+get_time()+") Test " + test_name + " ==> FAILED: " + str(result)+"\n") log_file.write(" *******************************************************************************************\n") failures = failures + 1 else: log_file.write(" ("+get_time()+") Test " + test_name +" passed in " + str(run_time) + "s\n") log_file.write(" ----------------------------------------------------------------------------------------\n") log_file.write("\n") return failures # ######################## # Begin OpenCL conformance run script # ######################## if (len(sys.argv) < 2): write_help_info() sys.exit(-1) current_directory = os.getcwd() # Open the log file for arg in sys.argv: match = re.search("log=(\S+)", arg) if (match): log_file_name = match.group(1).rstrip('/') + os.sep + log_file_name try: log_file = open(log_file_name, "w") except IOError: print "Could not open log file " + log_file_name # Determine which devices to test device_types = ["CL_DEVICE_TYPE_DEFAULT", "CL_DEVICE_TYPE_CPU", "CL_DEVICE_TYPE_GPU", "CL_DEVICE_TYPE_ACCELERATOR", "CL_DEVICE_TYPE_ALL"] devices_to_test = [] for device in device_types: if device in sys.argv[2:]: devices_to_test.append(device) if (len(devices_to_test) == 0): devices_to_test = ["CL_DEVICE_TYPE_DEFAULT"] write_screen_log("Testing on: " + str(devices_to_test)) # Get the tests tests = get_tests(sys.argv[1], devices_to_test) # If tests are specified on the command line then run just those ones tests_to_use = [] num_of_patterns_to_match = 0 for arg in sys.argv[2:]: if arg in device_types: continue if re.search("log=(\S+)", arg): continue num_of_patterns_to_match = num_of_patterns_to_match + 1 found_it = False for test in tests: (test_name, test_dir) = test if (test_name.find(arg) != -1 or test_dir.find(arg) != -1): found_it = True if (test not in tests_to_use): tests_to_use.append(test) if (found_it == False): print("Failed to find a test matching " + arg) if (len(tests_to_use) == 0): if (num_of_patterns_to_match > 0): print("FAILED: Failed to find any tests matching the given command-line options.") print("") write_help_info() sys.exit(-1) else: tests = tests_to_use[:] write_screen_log("Test execution arguments: " + str(sys.argv)) write_screen_log("Logging to file " + log_file_name +".") write_screen_log("Loaded tests from " + sys.argv[1] + ", total of " + str(len(tests)) + " tests selected to run:") for (test_name, test_command) in tests: write_screen_log(test_name.ljust(50) + " (" + test_command +")") # Run the tests total_failures = 0 for device_to_test in devices_to_test: os.environ['CL_DEVICE_TYPE'] = device_to_test write_screen_log("========================================================================================") write_screen_log("========================================================================================") write_screen_log(("Setting CL_DEVICE_TYPE to " + device_to_test).center(90)) write_screen_log("========================================================================================") write_screen_log("========================================================================================") failures = run_tests(tests) write_screen_log("========================================================================================") if (failures == 0): write_screen_log(">> TEST on " + device_to_test + " PASSED") else: write_screen_log(">> TEST on " + device_to_test + " FAILED (" + str(failures) + " FAILURES)") write_screen_log("========================================================================================") total_failures = total_failures + failures write_screen_log("("+get_time()+") Testing complete. " + str(total_failures) + " failures for " + str(len(tests)) + " tests.") log_file.close()