• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4#
5# Copyright (c) 2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import re
21import subprocess
22
23from datetime import datetime
24
25from util.log_util import LogUtil
26from hb.helper.no_instance import NoInstance
27from containers.status import throw_exception
28import copy
29
30
31class HandleKwargs(metaclass=NoInstance):
32    compile_item_pattern = re.compile(r'\[\d+/\d+\].+')
33    key_word_register_list = ["pre_msg", "log_stage", "after_msg", "log_filter", "custom_line_handle"]
34    filter_print = False
35
36    @staticmethod
37    def remove_registry_kwargs(kw_dict):
38        for item in HandleKwargs.key_word_register_list:
39            kw_dict.pop(item, "")
40
41    @staticmethod
42    def before_msg(kw_dict):
43        pre_msg = kw_dict.get('pre_msg', '')
44        if pre_msg:
45            LogUtil.hb_info(pre_msg)
46
47    @staticmethod
48    def set_log_stage(kw_dict):
49        log_stage = kw_dict.get('log_stage', '')
50        if log_stage:
51            LogUtil.set_stage(log_stage)
52
53    @staticmethod
54    def remove_useless_space(cmd):
55        while "" in cmd:
56            cmd.remove("")
57        return cmd
58
59    @staticmethod
60    def after_msg(kw_dict):
61        after_msg = kw_dict.get('after_msg', '')
62        if after_msg:
63            LogUtil.hb_info(after_msg)
64
65    @staticmethod
66    def clear_log_stage(kw_dict):
67        log_stage = kw_dict.get('log_stage', '')
68        if log_stage:
69            LogUtil.clear_stage()
70
71    @staticmethod
72    def handle_line(line, kw_dict):
73        filter_function = kw_dict.get('custom_line_handle', False)
74        if filter_function:
75            return filter_function(line)
76        else:
77            return True, line
78
79    @staticmethod
80    def set_filter_print(log_mode, kw_dict):
81        if kw_dict.get('log_filter', False) or log_mode == 'silent':
82            HandleKwargs.filter_print = True
83
84    @staticmethod
85    def handle_print(line, log_mode):
86        if HandleKwargs.filter_print:
87            info = re.findall(HandleKwargs.compile_item_pattern, line)
88            if len(info):
89                LogUtil.hb_info(info[0], mode=log_mode)
90        else:
91            LogUtil.hb_info(line)
92
93
94class SystemUtil(metaclass=NoInstance):
95    @staticmethod
96    def exec_command(cmd: list, log_path='out/build.log', exec_env=None, log_mode='normal',
97                     **kwargs):
98        raw_kwargs = copy.deepcopy(kwargs)
99        HandleKwargs.remove_registry_kwargs(kwargs)
100
101        HandleKwargs.set_log_stage(raw_kwargs)
102        HandleKwargs.set_filter_print(log_mode, raw_kwargs)
103        cmd = HandleKwargs.remove_useless_space(cmd)
104
105        if not os.path.exists(os.path.dirname(log_path)):
106            os.makedirs(os.path.dirname(log_path), exist_ok=True)
107
108        HandleKwargs.before_msg(raw_kwargs)
109        with open(log_path, 'at', encoding='utf-8') as log_file:
110            process = subprocess.Popen(cmd,
111                                       stdout=subprocess.PIPE,
112                                       stderr=subprocess.STDOUT,
113                                       encoding='utf-8',
114                                       env=exec_env,
115                                       **kwargs)
116            for line in iter(process.stdout.readline, ''):
117                keep_deal, new_line = HandleKwargs.handle_line(line, raw_kwargs)
118                if keep_deal:
119                    log_file.write(new_line)
120                    HandleKwargs.handle_print(new_line, log_mode)
121
122        process.wait()
123        HandleKwargs.after_msg(raw_kwargs)
124        HandleKwargs.clear_log_stage(raw_kwargs)
125
126        ret_code = process.returncode
127        if ret_code != 0:
128            LogUtil.get_failed_log(log_path)
129        return ret_code
130
131    @staticmethod
132    def get_current_time(time_type: str = 'default'):
133        if time_type == 'timestamp':
134            return int(datetime.utcnow().timestamp() * 1000)
135        if time_type == 'datetime':
136            return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
137        return datetime.now().replace(microsecond=0)
138
139
140class ExecEnviron:
141    def __init__(self):
142        self._env = None
143
144    @property
145    def allenv(self):
146        return self._env
147
148    @property
149    def allkeys(self):
150        if self._env is None:
151            return []
152        return list(self._env.keys())
153
154    def initenv(self):
155        self._env = os.environ.copy()
156
157    def allow(self, allowed_vars: list):
158        if self._env is not None:
159            allowed_env = {k: v for k, v in self._env.items() if k in allowed_vars}
160            self._env = allowed_env
161