1# -*- coding: utf-8 -*- 2# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 3# See https://llvm.org/LICENSE.txt for license information. 4# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 5""" This module is a collection of methods commonly used in this project. """ 6import collections 7import functools 8import json 9import logging 10import os 11import os.path 12import re 13import shlex 14import subprocess 15import sys 16 17ENVIRONMENT_KEY = 'INTERCEPT_BUILD' 18 19Execution = collections.namedtuple('Execution', ['pid', 'cwd', 'cmd']) 20 21CtuConfig = collections.namedtuple('CtuConfig', ['collect', 'analyze', 'dir', 22 'extdef_map_cmd']) 23 24 25def duplicate_check(method): 26 """ Predicate to detect duplicated entries. 27 28 Unique hash method can be use to detect duplicates. Entries are 29 represented as dictionaries, which has no default hash method. 30 This implementation uses a set datatype to store the unique hash values. 31 32 This method returns a method which can detect the duplicate values. """ 33 34 def predicate(entry): 35 entry_hash = predicate.unique(entry) 36 if entry_hash not in predicate.state: 37 predicate.state.add(entry_hash) 38 return False 39 return True 40 41 predicate.unique = method 42 predicate.state = set() 43 return predicate 44 45 46def run_build(command, *args, **kwargs): 47 """ Run and report build command execution 48 49 :param command: array of tokens 50 :return: exit code of the process 51 """ 52 environment = kwargs.get('env', os.environ) 53 logging.debug('run build %s, in environment: %s', command, environment) 54 exit_code = subprocess.call(command, *args, **kwargs) 55 logging.debug('build finished with exit code: %d', exit_code) 56 return exit_code 57 58 59def run_command(command, cwd=None): 60 """ Run a given command and report the execution. 61 62 :param command: array of tokens 63 :param cwd: the working directory where the command will be executed 64 :return: output of the command 65 """ 66 def decode_when_needed(result): 67 """ check_output returns bytes or string depend on python version """ 68 return result.decode('utf-8') if isinstance(result, bytes) else result 69 70 try: 71 directory = os.path.abspath(cwd) if cwd else os.getcwd() 72 logging.debug('exec command %s in %s', command, directory) 73 output = subprocess.check_output(command, 74 cwd=directory, 75 stderr=subprocess.STDOUT) 76 return decode_when_needed(output).splitlines() 77 except subprocess.CalledProcessError as ex: 78 ex.output = decode_when_needed(ex.output).splitlines() 79 raise ex 80 81 82def reconfigure_logging(verbose_level): 83 """ Reconfigure logging level and format based on the verbose flag. 84 85 :param verbose_level: number of `-v` flags received by the command 86 :return: no return value 87 """ 88 # Exit when nothing to do. 89 if verbose_level == 0: 90 return 91 92 root = logging.getLogger() 93 # Tune logging level. 94 level = logging.WARNING - min(logging.WARNING, (10 * verbose_level)) 95 root.setLevel(level) 96 # Be verbose with messages. 97 if verbose_level <= 3: 98 fmt_string = '%(name)s: %(levelname)s: %(message)s' 99 else: 100 fmt_string = '%(name)s: %(levelname)s: %(funcName)s: %(message)s' 101 handler = logging.StreamHandler(sys.stdout) 102 handler.setFormatter(logging.Formatter(fmt=fmt_string)) 103 root.handlers = [handler] 104 105 106def command_entry_point(function): 107 """ Decorator for command entry methods. 108 109 The decorator initialize/shutdown logging and guard on programming 110 errors (catch exceptions). 111 112 The decorated method can have arbitrary parameters, the return value will 113 be the exit code of the process. """ 114 115 @functools.wraps(function) 116 def wrapper(*args, **kwargs): 117 """ Do housekeeping tasks and execute the wrapped method. """ 118 119 try: 120 logging.basicConfig(format='%(name)s: %(message)s', 121 level=logging.WARNING, 122 stream=sys.stdout) 123 # This hack to get the executable name as %(name). 124 logging.getLogger().name = os.path.basename(sys.argv[0]) 125 return function(*args, **kwargs) 126 except KeyboardInterrupt: 127 logging.warning('Keyboard interrupt') 128 return 130 # Signal received exit code for bash. 129 except Exception: 130 logging.exception('Internal error.') 131 if logging.getLogger().isEnabledFor(logging.DEBUG): 132 logging.error("Please report this bug and attach the output " 133 "to the bug report") 134 else: 135 logging.error("Please run this command again and turn on " 136 "verbose mode (add '-vvvv' as argument).") 137 return 64 # Some non used exit code for internal errors. 138 finally: 139 logging.shutdown() 140 141 return wrapper 142 143 144def compiler_wrapper(function): 145 """ Implements compiler wrapper base functionality. 146 147 A compiler wrapper executes the real compiler, then implement some 148 functionality, then returns with the real compiler exit code. 149 150 :param function: the extra functionality what the wrapper want to 151 do on top of the compiler call. If it throws exception, it will be 152 caught and logged. 153 :return: the exit code of the real compiler. 154 155 The :param function: will receive the following arguments: 156 157 :param result: the exit code of the compilation. 158 :param execution: the command executed by the wrapper. """ 159 160 def is_cxx_compiler(): 161 """ Find out was it a C++ compiler call. Compiler wrapper names 162 contain the compiler type. C++ compiler wrappers ends with `c++`, 163 but might have `.exe` extension on windows. """ 164 165 wrapper_command = os.path.basename(sys.argv[0]) 166 return re.match(r'(.+)c\+\+(.*)', wrapper_command) 167 168 def run_compiler(executable): 169 """ Execute compilation with the real compiler. """ 170 171 command = executable + sys.argv[1:] 172 logging.debug('compilation: %s', command) 173 result = subprocess.call(command) 174 logging.debug('compilation exit code: %d', result) 175 return result 176 177 # Get relevant parameters from environment. 178 parameters = json.loads(os.environ[ENVIRONMENT_KEY]) 179 reconfigure_logging(parameters['verbose']) 180 # Execute the requested compilation. Do crash if anything goes wrong. 181 cxx = is_cxx_compiler() 182 compiler = parameters['cxx'] if cxx else parameters['cc'] 183 result = run_compiler(compiler) 184 # Call the wrapped method and ignore it's return value. 185 try: 186 call = Execution( 187 pid=os.getpid(), 188 cwd=os.getcwd(), 189 cmd=['c++' if cxx else 'cc'] + sys.argv[1:]) 190 function(result, call) 191 except: 192 logging.exception('Compiler wrapper failed complete.') 193 finally: 194 # Always return the real compiler exit code. 195 return result 196 197 198def wrapper_environment(args): 199 """ Set up environment for interpose compiler wrapper.""" 200 201 return { 202 ENVIRONMENT_KEY: json.dumps({ 203 'verbose': args.verbose, 204 'cc': shlex.split(args.cc), 205 'cxx': shlex.split(args.cxx) 206 }) 207 } 208