• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4#
5# Copyright (c) 2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import shutil
20import os
21import threading
22import subprocess
23import re
24import sys
25from enum import Enum
26
27from containers.status import throw_exception
28from exceptions.ohos_exception import OHOSException
29from services.interface.build_file_generator_interface import BuildFileGeneratorInterface
30from util.system_util import SystemUtil
31from util.component_util import ComponentUtil
32from resources.global_var import CURRENT_OHOS_ROOT, set_hpm_check_info
33from resources.global_var import get_hpm_check_info
34
35
36class CMDTYPE(Enum):
37    BUILD = 1
38    INSTALL = 2
39    PACKAGE = 3
40    PUBLISH = 4
41    UPDATE = 5
42
43
44class Hpm(BuildFileGeneratorInterface):
45
46    def __init__(self):
47        super().__init__()
48        self._regist_hpm_path()
49
50    def run(self):
51        self.execute_hpm_cmd(CMDTYPE.BUILD)
52
53    @throw_exception
54    def execute_hpm_cmd(self, cmd_type: int, **kwargs):
55        if cmd_type == CMDTYPE.BUILD:
56            return self._execute_hpm_build_cmd()
57        elif cmd_type == CMDTYPE.INSTALL:
58            return self._execute_hpm_install_cmd()
59        elif cmd_type == CMDTYPE.PACKAGE:
60            return self._execute_hpm_pack_cmd()
61        elif cmd_type == CMDTYPE.PUBLISH:
62            return self._execute_hpm_publish_cmd()
63        elif cmd_type == CMDTYPE.UPDATE:
64            return self._execute_hpm_update_cmd()
65        else:
66            raise OHOSException(
67                'You are tring to use an unsupported hpm cmd type "{}"'.format(cmd_type), '3001')
68
69    '''Description: Get hpm excutable path and regist it
70    @parameter: none
71    @return: Status
72    '''
73
74    @throw_exception
75    def _check_hpm_version(self, cmd, current_hpm):
76        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
77        try:
78            out, err = proc.communicate(timeout=5)
79        except subprocess.TimeoutExpired:
80            proc.kill()
81        if proc.returncode == 0:
82            latest_hpm_version = ""
83            pattern = r'^@ohos/hpm-cli\s*\|(?:[^|]*\|){3}([^|]*)'
84            for line in out.splitlines():
85                match = re.match(pattern, line)
86                if match:
87                    latest_hpm_version = match.group(1).strip()
88                    break
89            if latest_hpm_version and latest_hpm_version != current_hpm:
90                set_hpm_check_info(
91                    "your current hpm version is not the latest, consider update hpm: bash build/prebuilts_config.sh")
92
93    @throw_exception
94    def _regist_hpm_path(self):
95        hpm_path = shutil.which("hpm")
96        if hpm_path and os.path.exists(hpm_path):
97            self.exec = hpm_path
98        elif os.path.exists(os.path.join(CURRENT_OHOS_ROOT, "prebuilts/hpm/node_modules/.bin/hpm")):
99            self.exec = os.path.join(CURRENT_OHOS_ROOT, "prebuilts/hpm/node_modules/.bin/hpm")
100        else:
101            print("There is no hpm executable file: please execute 'bash build/prebuilt_config.sh' ")
102            raise OHOSException(
103                'There is no hpm executable file at {}'.format(hpm_path), '0001')
104
105        current_hpm_version = subprocess.run([self.exec, "-V"], capture_output=True, text=True).stdout.strip()
106        npm_path = os.path.join(CURRENT_OHOS_ROOT, "prebuilts/build-tools/common/nodejs/current/bin/npm")
107        cmd = npm_path + " search hpm-cli --registry https://registry.npmjs.org/"
108        cmd = cmd.split()
109        thread = threading.Thread(target=self._check_hpm_version, args=(cmd, current_hpm_version))
110        thread.start()
111
112    @throw_exception
113    def _execute_hpm_build_cmd(self, **kwargs):
114        if self.flags_dict.get("skip-download"):
115            return
116        else:
117            self.flags_dict.pop("skip-download")
118            hpm_build_cmd = [self.exec, "build"] + self._convert_flags()
119            variant = hpm_build_cmd[hpm_build_cmd.index("--variant") + 1]
120            logpath = os.path.join('out', variant, 'build.log')
121            if os.path.exists(logpath):
122                mtime = os.stat(logpath).st_mtime
123                os.rename(logpath, '{}/build.{}.log'.format(os.path.dirname(logpath), mtime))
124            self._run_hpm_cmd(hpm_build_cmd, log_path=logpath)
125
126    @throw_exception
127    def _execute_hpm_install_cmd(self, **kwargs):
128        hpm_install_cmd = [self.exec, "install"] + self._convert_flags()
129        self._run_hpm_cmd(hpm_install_cmd)
130
131    @throw_exception
132    def _execute_hpm_pack_cmd(self, **kwargs):
133        hpm_pack_cmd = [self.exec, "pack", "-t"] + self._convert_flags()
134        self._run_hpm_cmd(hpm_pack_cmd)
135
136    @throw_exception
137    def _execute_hpm_publish_cmd(self, **kwargs):
138        hpm_publish_cmd = [self.exec, "publish", "-t"] + self._convert_flags()
139        self._run_hpm_cmd(hpm_publish_cmd)
140
141    @throw_exception
142    def _execute_hpm_update_cmd(self, **kwargs):
143        hpm_update_cmd = [self.exec, "update"] + self._convert_flags()
144        self._run_hpm_cmd(hpm_update_cmd)
145
146    def _run_hpm_cmd(self, cmd, log_path):
147        ret_code = SystemUtil.exec_command(
148            cmd,
149            log_path=log_path,
150            pre_msg="start run hpm command",
151            after_msg="end hpm command",
152            custom_line_handle=self._custom_line_handle,
153        )
154        hpm_info = get_hpm_check_info()
155        if hpm_info:
156            print(hpm_info)
157        if ret_code != 0:
158            raise OHOSException(f"ERROR: hpm command failed, cmd: {cmd}", "0001")
159
160
161    def _custom_line_handle(self, line):
162        """
163        Handle the output line from the hpm command.
164        Args:
165            line (str): The output line from the hpm command.
166        Returns:
167            tuple: A tuple containing a boolean indicating whether the line should be processed,
168                   and the processed line (or an empty string if the line should be skipped).
169        """
170        if not hasattr(self, "custom_line_handle_preline"):
171            setattr(self, "custom_line_handle_preline", "")
172
173        if line.strip() == "" and "Extracting" in self.custom_line_handle_preline:
174            self.custom_line_handle_preline = line
175            return False, ""
176
177        if "Extracting..." in line:
178            if "Extracted successfully." in line:
179                if "DISABLE_PROGRESS" not in os.environ:
180                    sys.stdout.write("\r")
181                self.custom_line_handle_preline = line
182                return True, "Extracted successfully.\n"
183            else:
184                if "DISABLE_PROGRESS" not in os.environ:
185                    sys.stdout.write(f"\r[OHOS INFO]  {line.strip()}")
186                self.custom_line_handle_preline = line
187                return False, ""
188        else:
189            self.custom_line_handle_preline = line
190            return True, line
191
192    def _convert_args(self) -> list:
193        '''
194        Description: Convert all registed args into a list
195        @parameter: none
196        @return: list of all registed args
197        '''
198        args_list = []
199
200        for key, value in self.args_dict.items():
201            if isinstance(value, bool):
202                args_list.append('{}={}'.format(key, str(value).lower()))
203
204            elif isinstance(value, str):
205                args_list.append('{}="{}"'.format(key, value))
206
207            elif isinstance(value, int):
208                args_list.append('{}={}'.format(key, value))
209
210            elif isinstance(value, list):
211                args_list.append('{}="{}"'.format(key, "&&".join(value)))
212
213        return args_list
214
215    def _convert_flags(self) -> list:
216        '''
217        Description: Convert all registed flags into a list
218        @parameter: none
219        @return: list of all registed flags
220        '''
221        flags_list = []
222
223        for key, value in self.flags_dict.items():
224            # 部件参数无需参数名
225            if key == "part_name":
226                flags_list.append(str(value))
227            else:
228                if value == '':
229                    flags_list.append('--{}'.format(key))
230                elif key == 'path':
231                    flags_list.extend(['--{}'.format(key), '{}'.format(str(value))])
232                else:
233                    flags_list.extend(['--{}'.format(key).lower(), '{}'.format(str(value))])
234
235        if "--buildtype" in flags_list:
236            flags_list[flags_list.index("--buildtype")] = "-t"
237
238        return flags_list
239
240    def _check_parts_validity(self, components: list):
241        illegal_components = []
242        for component in components:
243            if not ComponentUtil.search_bundle_file(component):
244                illegal_components.append(component)
245        if illegal_components:
246            raise OHOSException('ERROR argument "--parts": Invalid parts "{}". '.format(illegal_components))
247