1# Copyright 2021 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================ 15"""tbe compile job definition""" 16import datetime 17import json 18from enum import Enum 19 20 21class JobType(Enum): 22 """ Job Type """ 23 INITIALIZE_JOB = 'Initialize' 24 FINALIZE_JOB = 'Finalize' 25 CHECK_JOB = 'CheckSupport' 26 SELECT_JOB = 'SelectFormat' 27 PRECOMPILE_JOB = 'PreCompile' 28 COMPILE_JOB = 'Compile' 29 FUSION_COMPILE_JOB = 'FusionOpCompile' 30 TUNE_JOB = 'Tune' 31 QUERY_JOB = 'Query' 32 33 34class LogLevel(Enum): 35 """ Log Level """ 36 DEBUG = 0 37 INFO = 1 38 WARNING = 2 39 ERROR = 3 40 ERROR_MANAGER = 4 41 42 43class JobStatus(Enum): 44 """ Job Status """ 45 JOB_INITIAL = "INITIAL" 46 JOB_FAILED = "FAILED" 47 JOB_SUCCESS = "SUCCESS" 48 JOB_RUNNING = "RUNNING" 49 50 51class LogMessage: 52 """ Log message """ 53 54 def __init__(self, index, level, info): 55 self.index = index 56 self.level = level 57 self.info = info 58 59 60def _get_message(msg, args): 61 """ 62 Return the message for this LogRecord. 63 64 Return the message for this LogRecord after merging any user-supplied 65 arguments with the message. 66 """ 67 msg = str(msg) 68 if args: 69 msg = msg % args 70 return str(datetime.datetime.now()) + ": " + msg 71 72 73class TbeJob: 74 """ Tbe compilation job """ 75 76 def __init__(self, source_id, job_id, job_type, content, fusion_op_name, json_str, sys_info): 77 self.source_id = source_id 78 self.id = job_id 79 self.type = JobType(job_type) 80 self.status = JobStatus.JOB_INITIAL 81 self.content = content 82 self.fusion_op_name = fusion_op_name 83 self.result = "" 84 self.process_info = [] 85 self.json_string = json_str 86 self._sys_logger = sys_info["logger"] 87 self.sys_offline_tune = sys_info["offline_tune"] 88 self.sys_tune_dump_path = sys_info["tune_dump_path"] 89 self.sys_para_debug_path = sys_info["para_debug_path"] 90 # license info 91 self.rl_tune_switch = sys_info["rl_tune_switch"] 92 self.rl_tune_list = sys_info["rl_tune_list"] 93 self.op_tune_switch = sys_info["op_tune_switch"] 94 self.op_tune_list = sys_info["op_tune_list"] 95 self.pass_list = sys_info["pass_list"] 96 97 def debug(self, msg, *args, **kwargs): 98 """ 99 log debug level info 100 :param msg: 101 :param args: 102 :return: 103 """ 104 processed_msg = _get_message(msg, args) 105 message = LogMessage(len(self.process_info), LogLevel.DEBUG, processed_msg) 106 self.process_info.append(message) 107 self._sys_logger.debug(msg, *args, **kwargs) 108 109 def info(self, msg, *args, **kwargs): 110 """ 111 log info level info 112 :param msg: 113 :param args: 114 :return: 115 """ 116 processed_msg = _get_message(msg, args) 117 message = LogMessage(len(self.process_info), LogLevel.INFO, processed_msg) 118 self.process_info.append(message) 119 self._sys_logger.info(msg, *args, **kwargs) 120 121 def warning(self, msg, *args, **kwargs): 122 """ 123 log warning level info 124 :param msg: 125 :param args: 126 :return: 127 """ 128 processed_msg = _get_message(msg, args) 129 message = LogMessage(len(self.process_info), LogLevel.WARNING, processed_msg) 130 self.process_info.append(message) 131 self._sys_logger.warning(msg, *args, **kwargs) 132 133 def error(self, msg, *args, **kwargs): 134 """ 135 log error level info 136 :param msg: 137 :param args: 138 :return: 139 """ 140 processed_msg = _get_message(msg, args) 141 message = LogMessage(len(self.process_info), LogLevel.ERROR, processed_msg) 142 self.process_info.append(message) 143 self._sys_logger.error(msg, *args, **kwargs) 144 145 def error_manager(self, msg, *args, **kwargs): 146 """ 147 log exception level info 148 :param msg: 149 :param args: 150 :return: 151 """ 152 if not msg: 153 self.warning("Get empty error manager message, op_name: {}".format(self.fusion_op_name)) 154 return 155 exception_info = None 156 op_name = self.fusion_op_name 157 if isinstance(msg, Exception): 158 for arg in msg.args: 159 if isinstance(arg, dict) and "errCode" in arg: 160 exception_info = arg 161 break 162 if not exception_info: 163 self.error("Exception message:{}".format(msg)) 164 return 165 else: 166 exception_info = msg[0] 167 if len(msg) >= 2: 168 op_name = msg[1] 169 if not isinstance(exception_info, dict) or not exception_info: 170 self.warning("Get illegal error manager message, op_name: {}".format(self.fusion_op_name)) 171 return 172 exception_info["op_name"] = op_name 173 processed_msg = json.dumps(exception_info) 174 message = LogMessage(len(self.process_info), LogLevel.ERROR_MANAGER, processed_msg) 175 self.process_info.append(message) 176 self._sys_logger.exception(msg, *args, **kwargs) 177 178 def get_result(self): 179 """ 180 Get tht job process result string 181 :return: job process result string 182 """ 183 result = dict() 184 result["status"] = self.status.value 185 result["source_id"] = self.source_id 186 result["job_id"] = self.id 187 result["job_type"] = self.type.value 188 result["fusion_op_name"] = self.fusion_op_name 189 result["result"] = self.result 190 process_info = [] 191 for info in self.process_info: 192 msg = {"index": info.index, "level": info.level.value, "message": info.info} 193 process_info.append(msg) 194 result["process_info"] = process_info 195 return json.dumps(result) 196