• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4#
5# Copyright (c) 2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import re
21import subprocess
22
23from datetime import datetime
24
25from util.log_util import LogUtil
26from hb.helper.no_instance import NoInstance
27from containers.status import throw_exception
28import copy
29
30
31class HandleKwargs(metaclass=NoInstance):
32    compile_item_pattern = re.compile(r'\[\d+/\d+\].+')
33    key_word_register_list = ["pre_msg", "log_stage", "after_msg", "log_filter", "custom_line_handle"]
34    filter_print = False
35
36    @staticmethod
37    def remove_registry_kwargs(kw_dict):
38        for item in HandleKwargs.key_word_register_list:
39            kw_dict.pop(item, "")
40
41    @staticmethod
42    def before_msg(kw_dict):
43        pre_msg = kw_dict.get('pre_msg', '')
44        if pre_msg:
45            LogUtil.hb_info(pre_msg)
46
47    @staticmethod
48    def set_log_stage(kw_dict):
49        log_stage = kw_dict.get('log_stage', '')
50        if log_stage:
51            LogUtil.set_stage(log_stage)
52
53    @staticmethod
54    def remove_useless_space(cmd):
55        while "" in cmd:
56            cmd.remove("")
57        return cmd
58
59    @staticmethod
60    def after_msg(kw_dict):
61        after_msg = kw_dict.get('after_msg', '')
62        if after_msg:
63            LogUtil.hb_info(after_msg)
64
65    @staticmethod
66    def clear_log_stage(kw_dict):
67        log_stage = kw_dict.get('log_stage', '')
68        if log_stage:
69            LogUtil.clear_stage()
70
71    @staticmethod
72    def handle_line(line, kw_dict):
73        filter_function = kw_dict.get('custom_line_handle', False)
74        if filter_function:
75            return filter_function(line)
76        else:
77            return True, line
78
79    @staticmethod
80    def set_filter_print(log_mode, kw_dict):
81        if kw_dict.get('log_filter', False) or log_mode == 'silent':
82            HandleKwargs.filter_print = True
83
84    @staticmethod
85    def handle_print(line, log_mode):
86        if HandleKwargs.filter_print:
87            info = re.findall(HandleKwargs.compile_item_pattern, line)
88            if len(info):
89                LogUtil.hb_info(info[0], mode=log_mode)
90        else:
91            LogUtil.hb_info(line)
92
93
94class SystemUtil(metaclass=NoInstance):
95    @staticmethod
96    def exec_command(cmd: list, log_path='out/build.log', exec_env=None, log_mode='normal',
97                     **kwargs):
98        raw_kwargs = copy.deepcopy(kwargs)
99        HandleKwargs.remove_registry_kwargs(kwargs)
100
101        HandleKwargs.set_log_stage(raw_kwargs)
102        HandleKwargs.set_filter_print(log_mode, raw_kwargs)
103        cmd = HandleKwargs.remove_useless_space(cmd)
104
105        if not os.path.exists(os.path.dirname(log_path)):
106            os.makedirs(os.path.dirname(log_path), exist_ok=True)
107
108        HandleKwargs.before_msg(raw_kwargs)
109        with open(log_path, 'at', encoding='utf-8') as log_file:
110            process = subprocess.Popen(cmd,
111                                       stdout=subprocess.PIPE,
112                                       stderr=subprocess.STDOUT,
113                                       encoding='utf-8',
114                                       env=exec_env,
115                                       errors="ignore",
116                                       **kwargs)
117            for line in iter(process.stdout.readline, ''):
118                keep_deal, new_line = HandleKwargs.handle_line(line, raw_kwargs)
119                if keep_deal:
120                    log_file.write(new_line)
121                    HandleKwargs.handle_print(new_line, log_mode)
122
123        process.wait()
124        HandleKwargs.after_msg(raw_kwargs)
125        HandleKwargs.clear_log_stage(raw_kwargs)
126
127        ret_code = process.returncode
128        if ret_code != 0:
129            LogUtil.get_failed_log(log_path)
130        return ret_code
131
132    @staticmethod
133    def get_current_time(time_type: str = 'default'):
134        if time_type == 'timestamp':
135            return int(datetime.utcnow().timestamp() * 1000)
136        if time_type == 'datetime':
137            return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
138        return datetime.now().replace(microsecond=0)
139
140
141class ExecEnviron:
142    def __init__(self):
143        self._env = None
144
145    @property
146    def allenv(self):
147        return self._env
148
149    @property
150    def allkeys(self):
151        if self._env is None:
152            return []
153        return list(self._env.keys())
154
155    def initenv(self):
156        self._env = os.environ.copy()
157
158    def allow(self, allowed_vars: list):
159        if self._env is not None:
160            allowed_env = {k: v for k, v in self._env.items() if k in allowed_vars}
161            self._env = allowed_env
162