1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# 5# Copyright (c) 2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import re 21import subprocess 22 23from datetime import datetime 24 25from util.log_util import LogUtil 26from hb.helper.no_instance import NoInstance 27from containers.status import throw_exception 28import copy 29 30 31class HandleKwargs(metaclass=NoInstance): 32 compile_item_pattern = re.compile(r'\[\d+/\d+\].+') 33 key_word_register_list = ["pre_msg", "log_stage", "after_msg", "log_filter", "custom_line_handle"] 34 filter_print = False 35 36 @staticmethod 37 def remove_registry_kwargs(kw_dict): 38 for item in HandleKwargs.key_word_register_list: 39 kw_dict.pop(item, "") 40 41 @staticmethod 42 def before_msg(kw_dict): 43 pre_msg = kw_dict.get('pre_msg', '') 44 if pre_msg: 45 LogUtil.hb_info(pre_msg) 46 47 @staticmethod 48 def set_log_stage(kw_dict): 49 log_stage = kw_dict.get('log_stage', '') 50 if log_stage: 51 LogUtil.set_stage(log_stage) 52 53 @staticmethod 54 def remove_useless_space(cmd): 55 while "" in cmd: 56 cmd.remove("") 57 return cmd 58 59 @staticmethod 60 def after_msg(kw_dict): 61 after_msg = kw_dict.get('after_msg', '') 62 if after_msg: 63 LogUtil.hb_info(after_msg) 64 65 @staticmethod 66 def clear_log_stage(kw_dict): 67 log_stage = kw_dict.get('log_stage', '') 68 if log_stage: 69 LogUtil.clear_stage() 70 71 @staticmethod 72 def handle_line(line, kw_dict): 73 filter_function = kw_dict.get('custom_line_handle', False) 74 if filter_function: 75 return filter_function(line) 76 else: 77 return True, line 78 79 @staticmethod 80 def set_filter_print(log_mode, kw_dict): 81 if kw_dict.get('log_filter', False) or log_mode == 'silent': 82 HandleKwargs.filter_print = True 83 84 @staticmethod 85 def handle_print(line, log_mode): 86 if HandleKwargs.filter_print: 87 info = re.findall(HandleKwargs.compile_item_pattern, line) 88 if len(info): 89 LogUtil.hb_info(info[0], mode=log_mode) 90 else: 91 LogUtil.hb_info(line) 92 93 94class SystemUtil(metaclass=NoInstance): 95 @staticmethod 96 def exec_command(cmd: list, log_path='out/build.log', exec_env=None, log_mode='normal', 97 **kwargs): 98 raw_kwargs = copy.deepcopy(kwargs) 99 HandleKwargs.remove_registry_kwargs(kwargs) 100 101 HandleKwargs.set_log_stage(raw_kwargs) 102 HandleKwargs.set_filter_print(log_mode, raw_kwargs) 103 cmd = HandleKwargs.remove_useless_space(cmd) 104 105 if not os.path.exists(os.path.dirname(log_path)): 106 os.makedirs(os.path.dirname(log_path), exist_ok=True) 107 108 HandleKwargs.before_msg(raw_kwargs) 109 with open(log_path, 'at', encoding='utf-8') as log_file: 110 process = subprocess.Popen(cmd, 111 stdout=subprocess.PIPE, 112 stderr=subprocess.STDOUT, 113 encoding='utf-8', 114 env=exec_env, 115 **kwargs) 116 for line in iter(process.stdout.readline, ''): 117 keep_deal, new_line = HandleKwargs.handle_line(line, raw_kwargs) 118 if keep_deal: 119 log_file.write(new_line) 120 HandleKwargs.handle_print(new_line, log_mode) 121 122 process.wait() 123 HandleKwargs.after_msg(raw_kwargs) 124 HandleKwargs.clear_log_stage(raw_kwargs) 125 126 ret_code = process.returncode 127 if ret_code != 0: 128 LogUtil.get_failed_log(log_path) 129 return ret_code 130 131 @staticmethod 132 def get_current_time(time_type: str = 'default'): 133 if time_type == 'timestamp': 134 return int(datetime.utcnow().timestamp() * 1000) 135 if time_type == 'datetime': 136 return datetime.now().strftime('%Y-%m-%d %H:%M:%S') 137 return datetime.now().replace(microsecond=0) 138 139 140class ExecEnviron: 141 def __init__(self): 142 self._env = None 143 144 @property 145 def allenv(self): 146 return self._env 147 148 @property 149 def allkeys(self): 150 if self._env is None: 151 return [] 152 return list(self._env.keys()) 153 154 def initenv(self): 155 self._env = os.environ.copy() 156 157 def allow(self, allowed_vars: list): 158 if self._env is not None: 159 allowed_env = {k: v for k, v in self._env.items() if k in allowed_vars} 160 self._env = allowed_env 161