1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3 4# 5# Copyright (c) 2020 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import re 21import subprocess 22import shutil 23import sys 24import json 25import tarfile 26import zipfile 27import importlib 28from datetime import datetime 29from collections import namedtuple 30 31 32def encode(data, encoding='utf-8'): 33 if sys.version_info.major == 2: 34 return data.encode(encoding) 35 return data 36 37 38def decode(data, encoding='utf-8'): 39 if sys.version_info.major == 2: 40 return data.decode(encoding) 41 return data 42 43 44def remove_path(path): 45 if os.path.exists(path): 46 shutil.rmtree(path) 47 48 49# Read json file data 50def read_json_file(input_file): 51 if not os.path.isfile(input_file): 52 raise OHOSException(f'{input_file} not found') 53 54 with open(input_file, 'rb') as input_f: 55 data = json.load(input_f) 56 return data 57 58 59def dump_json_file(dump_file, json_data): 60 with open(dump_file, 'wt', encoding='utf-8') as json_file: 61 json.dump(json_data, json_file, ensure_ascii=False, indent=2) 62 63 64def read_yaml_file(input_file): 65 if not os.path.isfile(input_file): 66 raise OHOSException(f'{input_file} not found') 67 68 yaml = importlib.import_module('yaml') 69 with open(input_file, 'rt', encoding='utf-8') as yaml_file: 70 try: 71 return yaml.safe_load(yaml_file) 72 except yaml.YAMLError as exc: 73 if hasattr(exc, 'problem_mark'): 74 mark = exc.problem_mark 75 raise OHOSException(f'{input_file} load failed, error line:' 76 f' {mark.line + 1}:{mark.column + 1}') 77 78 79def get_input(msg): 80 try: 81 user_input = input 82 except NameError: 83 raise OHOSException('python2.x not supported') 84 return user_input(msg) 85 86 87def exec_command(cmd, log_path='out/build.log', **kwargs): 88 useful_info_pattern = re.compile(r'\[\d+/\d+\].+') 89 is_log_filter = kwargs.pop('log_filter', False) 90 91 with open(log_path, 'at', encoding='utf-8') as log_file: 92 process = subprocess.Popen(cmd, 93 stdout=subprocess.PIPE, 94 stderr=subprocess.STDOUT, 95 encoding='utf-8', 96 **kwargs) 97 for line in iter(process.stdout.readline, ''): 98 if is_log_filter: 99 info = re.findall(useful_info_pattern, line) 100 if len(info): 101 hb_info(info[0]) 102 else: 103 hb_info(line) 104 log_file.write(line) 105 106 process.wait() 107 ret_code = process.returncode 108 109 if ret_code != 0: 110 if is_log_filter: 111 get_failed_log(log_path) 112 113 raise OHOSException('Please check build log in {}'.format(log_path)) 114 115 116def get_failed_log(log_path): 117 with open(log_path, 'rt', encoding='utf-8') as log_file: 118 data = log_file.read() 119 failed_pattern = re.compile( 120 r'(\[\d+/\d+\].*?)(?=\[\d+/\d+\]|' 121 'ninja: build stopped)', re.DOTALL) 122 failed_log = failed_pattern.findall(data) 123 for log in failed_log: 124 if 'FAILED:' in log: 125 hb_error(log) 126 127 failed_pattern = re.compile(r'(ninja: error:.*?)\n', re.DOTALL) 128 failed_log = failed_pattern.findall(data) 129 for log in failed_log: 130 hb_error(log) 131 132 error_log = os.path.join(os.path.dirname(log_path), 'error.log') 133 if os.path.isfile(error_log): 134 with open(error_log, 'rt', encoding='utf-8') as log_file: 135 hb_error(log_file.read()) 136 137 138def check_output(cmd, **kwargs): 139 try: 140 ret = subprocess.check_output(cmd, 141 stderr=subprocess.STDOUT, 142 universal_newlines=True, 143 **kwargs) 144 except subprocess.CalledProcessError as called_exception: 145 ret = called_exception.output 146 if isinstance(cmd, list): 147 cmd = ' '.join(cmd) 148 raise OHOSException(f'command: "{cmd}" failed\n' 149 f'return code: {ret}\n' 150 f'execution path: {os.getcwd()}') 151 152 return ret 153 154 155def makedirs(path, exist_ok=True, with_rm=False): 156 try: 157 os.makedirs(path) 158 except OSError: 159 if not os.path.isdir(path): 160 raise OHOSException(f"{path} makedirs failed") 161 if with_rm: 162 remove_path(path) 163 return os.makedirs(path) 164 if not exist_ok: 165 raise OHOSException(f"{path} exists, makedirs failed") 166 167 168def get_project_path(json_path): 169 json_data = read_json_file(json_path) 170 171 return json_data.get('root_path') 172 173 174def args_factory(args_dict): 175 if not len(args_dict): 176 raise OHOSException('at least one k_v param is ' 177 'required in args_factory') 178 179 args_cls = namedtuple('Args', [key for key in args_dict.keys()]) 180 args = args_cls(**args_dict) 181 return args 182 183 184def get_current_time(type='default'): 185 if type == 'timestamp': 186 return int(datetime.utcnow().timestamp() * 1000) 187 if type == 'datetime': 188 return datetime.now().strftime('%Y-%m-%d %H:%M:%S') 189 return datetime.now().replace(microsecond=0) 190 191 192class Colors: 193 HEADER = '\033[95m' 194 WARNING = '\033[93m' 195 ERROR = '\033[91m' 196 INFO = '\033[92m' 197 END = '\033[0m' 198 199 200def hb_info(msg): 201 level = 'info' 202 for line in str(msg).splitlines(): 203 sys.stdout.write(message(level, line)) 204 sys.stdout.flush() 205 206 207def hb_warning(msg): 208 level = 'warning' 209 for line in str(msg).splitlines(): 210 sys.stderr.write(message(level, line)) 211 sys.stderr.flush() 212 213 214def hb_error(msg): 215 level = 'error' 216 for line in str(msg).splitlines(): 217 sys.stderr.write(message(level, line)) 218 sys.stderr.flush() 219 220 221def hb_debug(msg): 222 level = 'debug' 223 for line in str(msg).splitlines(): 224 sys.stderr.write(message(level, line)) 225 sys.stderr.flush() 226 227 228def message(level, msg): 229 if isinstance(msg, str) and not msg.endswith('\n'): 230 msg += '\n' 231 if level == 'error': 232 msg = msg.replace('error:', f'{Colors.ERROR}error{Colors.END}:') 233 return f'{Colors.ERROR}[OHOS {level.upper()}]{Colors.END} {msg}' 234 elif level == 'info': 235 return f'[OHOS {level.upper()}] {msg}' 236 else: 237 return f'{Colors.WARNING}[OHOS {level.upper()}]{Colors.END} {msg}' 238 239 240class Singleton(type): 241 _instances = {} 242 243 def __call__(cls, *args, **kwargs): 244 if cls not in cls._instances: 245 cls._instances[cls] = super(Singleton, 246 cls).__call__(*args, **kwargs) 247 return cls._instances[cls] 248 249 250class OHOSException(Exception): 251 pass 252 253 254def download_tool(url, dst, tgt_dir=None): 255 requests = importlib.import_module('requests') 256 try: 257 res = requests.get(url, stream=True, timeout=(5, 9)) 258 except OSError: 259 raise OHOSException(f'download {url} timeout!') 260 261 if res.status_code == 200: 262 hb_info(f'Downloading {url} ...') 263 else: 264 hb_error(f'Downloading {url} failed with code: {res.status_code}!') 265 return res.status_code 266 267 total_size = int(res.headers['content-length']) 268 download_size = 0 269 download_percent = 0 270 271 try: 272 with open(dst, "wb") as f: 273 for chunk in res.iter_content(chunk_size=1024): 274 if chunk: 275 f.write(chunk) 276 download_size += len(chunk) 277 download_percent = round( 278 float(download_size / total_size * 100), 2) 279 print('Progress: %s%%\r' % download_percent, end=' ') 280 hb_info('Download complete!') 281 except OSError: 282 raise OHOSException( 283 f'{url} download failed, please install it manually!') 284 285 if tgt_dir is not None: 286 extract_tool(dst, tgt_dir) 287 288 289def extract_tool(src, tgt_dir): 290 hb_info(f'Extracting to {tgt_dir}, please wait...') 291 try: 292 if tarfile.is_tarfile(src): 293 ef = tarfile.open(src) 294 elif zipfile.is_zipfile(src): 295 ef = zipfile.ZipFile(src) 296 else: 297 raise OHOSException(f'Extract file type not support!') 298 ef.extractall(tgt_dir) 299 ef.close() 300 except OSError: 301 raise OHOSException( 302 f'{src} extract failed, please install it manually!') 303