1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# 5# Copyright (c) 2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import shutil 20import os 21import threading 22import subprocess 23import re 24import sys 25from enum import Enum 26 27from containers.status import throw_exception 28from exceptions.ohos_exception import OHOSException 29from services.interface.build_file_generator_interface import BuildFileGeneratorInterface 30from util.system_util import SystemUtil 31from util.component_util import ComponentUtil 32from resources.global_var import CURRENT_OHOS_ROOT, set_hpm_check_info 33from resources.global_var import get_hpm_check_info 34from util.log_util import LogUtil 35 36 37class CMDTYPE(Enum): 38 BUILD = 1 39 INSTALL = 2 40 PACKAGE = 3 41 PUBLISH = 4 42 UPDATE = 5 43 44 45class Hpm(BuildFileGeneratorInterface): 46 47 def __init__(self): 48 super().__init__() 49 50 def run(self): 51 self._regist_hpm_path() 52 self.execute_hpm_cmd(CMDTYPE.BUILD) 53 54 @throw_exception 55 def execute_hpm_cmd(self, cmd_type: int, **kwargs): 56 if cmd_type == CMDTYPE.BUILD: 57 return self._execute_hpm_build_cmd() 58 elif cmd_type == CMDTYPE.INSTALL: 59 return self._execute_hpm_install_cmd() 60 elif cmd_type == CMDTYPE.PACKAGE: 61 return self._execute_hpm_pack_cmd() 62 elif cmd_type == CMDTYPE.PUBLISH: 63 return self._execute_hpm_publish_cmd() 64 elif cmd_type == CMDTYPE.UPDATE: 65 return self._execute_hpm_update_cmd() 66 else: 67 raise OHOSException( 68 'You are tring to use an unsupported hpm cmd type "{}"'.format(cmd_type), '3001') 69 70 '''Description: Get hpm excutable path and regist it 71 @parameter: none 72 @return: Status 73 ''' 74 75 @throw_exception 76 def _check_hpm_version(self, cmd, current_hpm): 77 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) 78 try: 79 out, err = proc.communicate(timeout=5) 80 except subprocess.TimeoutExpired: 81 proc.kill() 82 if proc.returncode == 0: 83 latest_hpm_version = "" 84 pattern = r'^@ohos/hpm-cli\s*\|(?:[^|]*\|){3}([^|]*)' 85 for line in out.splitlines(): 86 match = re.match(pattern, line) 87 if match: 88 latest_hpm_version = match.group(1).strip() 89 break 90 if latest_hpm_version and latest_hpm_version != current_hpm: 91 set_hpm_check_info( 92 "your current hpm version is not the latest, consider update hpm: bash build/prebuilts_config.sh") 93 94 @throw_exception 95 def _regist_hpm_path(self): 96 hpm_path = shutil.which("hpm") 97 if hpm_path and os.path.exists(hpm_path): 98 self.exec = hpm_path 99 elif os.path.exists(os.path.join(CURRENT_OHOS_ROOT, "prebuilts/hpm/node_modules/.bin/hpm")): 100 self.exec = os.path.join(CURRENT_OHOS_ROOT, "prebuilts/hpm/node_modules/.bin/hpm") 101 else: 102 print("There is no hpm executable file: please execute 'bash build/prebuilt_config.sh' ") 103 raise OHOSException( 104 'There is no hpm executable file at {}'.format(hpm_path), '0001') 105 106 current_hpm_version = subprocess.run([self.exec, "-V"], capture_output=True, text=True).stdout.strip() 107 npm_path = os.path.join(CURRENT_OHOS_ROOT, "prebuilts/build-tools/common/nodejs/current/bin/npm") 108 cmd = npm_path + " search hpm-cli --registry https://registry.npmjs.org/" 109 cmd = cmd.split() 110 thread = threading.Thread(target=self._check_hpm_version, args=(cmd, current_hpm_version)) 111 thread.start() 112 113 @throw_exception 114 def _execute_hpm_build_cmd(self, **kwargs): 115 if self.flags_dict.get("skip-download") or self.flags_dict.get("fast-rebuild"): 116 return 117 else: 118 self.flags_dict.pop("skip-download") 119 LogUtil.hb_info("Tips: If you want to skip download binary dependencies, please use --skip-download") 120 hpm_build_cmd = [self.exec, "build"] + self._convert_flags() 121 variant = hpm_build_cmd[hpm_build_cmd.index("--variant") + 1] 122 logpath = os.path.join('out', variant, 'build.log') 123 if os.path.exists(logpath): 124 mtime = os.stat(logpath).st_mtime 125 os.rename(logpath, '{}/build.{}.log'.format(os.path.dirname(logpath), mtime)) 126 self._run_hpm_cmd(hpm_build_cmd, log_path=logpath) 127 128 @throw_exception 129 def _execute_hpm_install_cmd(self, **kwargs): 130 hpm_install_cmd = [self.exec, "install"] + self._convert_flags() 131 self._run_hpm_cmd(hpm_install_cmd) 132 133 @throw_exception 134 def _execute_hpm_pack_cmd(self, **kwargs): 135 hpm_pack_cmd = [self.exec, "pack", "-t"] + self._convert_flags() 136 self._run_hpm_cmd(hpm_pack_cmd) 137 138 @throw_exception 139 def _execute_hpm_publish_cmd(self, **kwargs): 140 hpm_publish_cmd = [self.exec, "publish", "-t"] + self._convert_flags() 141 self._run_hpm_cmd(hpm_publish_cmd) 142 143 @throw_exception 144 def _execute_hpm_update_cmd(self, **kwargs): 145 hpm_update_cmd = [self.exec, "update"] + self._convert_flags() 146 self._run_hpm_cmd(hpm_update_cmd) 147 148 def _run_hpm_cmd(self, cmd, log_path): 149 LogUtil.hb_info(f"Hpm cmd is: {cmd}") 150 ret_code = SystemUtil.exec_command( 151 cmd, 152 log_path=log_path, 153 pre_msg="start run hpm command", 154 after_msg="end hpm command", 155 custom_line_handle=self._custom_line_handle, 156 ) 157 hpm_info = get_hpm_check_info() 158 if hpm_info: 159 print(hpm_info) 160 if ret_code != 0: 161 raise OHOSException(f"ERROR: hpm command failed, cmd: {cmd}", "0001") 162 163 164 def _custom_line_handle(self, line): 165 """ 166 Handle the output line from the hpm command. 167 Args: 168 line (str): The output line from the hpm command. 169 Returns: 170 tuple: A tuple containing a boolean indicating whether the line should be processed, 171 and the processed line (or an empty string if the line should be skipped). 172 """ 173 if not hasattr(self, "custom_line_handle_preline"): 174 setattr(self, "custom_line_handle_preline", "") 175 176 if line.strip() == "" and "Extracting" in self.custom_line_handle_preline: 177 self.custom_line_handle_preline = line 178 return False, "" 179 180 if "Extracting..." in line: 181 if "Extracted successfully." in line: 182 if "DISABLE_PROGRESS" not in os.environ: 183 sys.stdout.write("\r") 184 self.custom_line_handle_preline = line 185 return True, "Extracted successfully.\n" 186 else: 187 if "DISABLE_PROGRESS" not in os.environ: 188 sys.stdout.write(f"\r[OHOS INFO] {line.strip()}") 189 self.custom_line_handle_preline = line 190 return False, "" 191 else: 192 self.custom_line_handle_preline = line 193 return True, line 194 195 def _convert_args(self) -> list: 196 ''' 197 Description: Convert all registed args into a list 198 @parameter: none 199 @return: list of all registed args 200 ''' 201 args_list = [] 202 203 for key, value in self.args_dict.items(): 204 if isinstance(value, bool): 205 args_list.append('{}={}'.format(key, str(value).lower())) 206 207 elif isinstance(value, str): 208 args_list.append('{}="{}"'.format(key, value)) 209 210 elif isinstance(value, int): 211 args_list.append('{}={}'.format(key, value)) 212 213 elif isinstance(value, list): 214 args_list.append('{}="{}"'.format(key, "&&".join(value))) 215 216 return args_list 217 218 def _convert_flags(self) -> list: 219 ''' 220 Description: Convert all registed flags into a list 221 @parameter: none 222 @return: list of all registed flags 223 ''' 224 flags_list = [] 225 226 for key, value in self.flags_dict.items(): 227 # 部件参数无需参数名 228 if key == "part_name": 229 flags_list.append(str(value)) 230 else: 231 if value == '': 232 flags_list.append('--{}'.format(key)) 233 elif key == 'path': 234 flags_list.extend(['--{}'.format(key), '{}'.format(str(value))]) 235 else: 236 flags_list.extend(['--{}'.format(key).lower(), '{}'.format(str(value))]) 237 238 if "--buildtype" in flags_list: 239 flags_list[flags_list.index("--buildtype")] = "-t" 240 241 return flags_list 242 243 def _check_parts_validity(self, components: list): 244 illegal_components = [] 245 for component in components: 246 if not ComponentUtil.search_bundle_file(component): 247 illegal_components.append(component) 248 if illegal_components: 249 raise OHOSException('ERROR argument "--parts": Invalid parts "{}". '.format(illegal_components)) 250