1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19 20import os 21import sys 22import shutil 23import argparse 24sys.path.append(os.path.dirname(os.path.dirname( 25 os.path.dirname(os.path.abspath(__file__))))) 26from hb.resources.config import Config 27from hb.util.log_util import LogUtil 28from hb.util.system_util import SystemUtil 29from hb.util.io_util import IoUtil 30 31 32class Packer(): 33 def __init__(self, packer_args) -> None: 34 self.config = Config() 35 self.replace_items = { 36 r'${product_name}': self.config.product, 37 r'${root_path}': self.config.root_path, 38 r'${out_path}': self.config.out_path 39 } 40 self.packing_process = [ 41 self.mv_usr_libs, self.create_fs_dirs, self.fs_link, 42 self.fs_filemode, self.fs_make_cmd, self.fs_tear_down 43 ] 44 self.fs_cfg = None 45 self.chmod_dirs = [] 46 47 def mv_usr_libs(self): 48 src_path = self.config.out_path 49 libs = [lib for lib in os.listdir(src_path) if self.is_lib(lib)] 50 target_path = os.path.join(src_path, 'usr', 'lib') 51 os.makedirs(target_path, exist_ok=True) 52 53 for lib in libs: 54 source_file = os.path.join(src_path, lib) 55 target_file = os.path.join(target_path, lib) 56 shutil.move(source_file, target_file) 57 58 @classmethod 59 def is_lib(cls, lib): 60 return lib.startswith('lib') and lib.endswith('.so') 61 62 @classmethod 63 def is_incr(cls, fs_incr): 64 exist_ok = False if fs_incr is None else True 65 with_rm = True if fs_incr is None else False 66 return exist_ok, with_rm 67 68 def create_fs_dirs(self): 69 fs_path = os.path.join(self.config.out_path, 70 self.fs_cfg.get('fs_dir_name', 'rootfs')) 71 exist_ok, with_rm = self.is_incr(self.fs_cfg.get('fs_incr', None)) 72 if with_rm and os.path.exists(fs_path): 73 shutil.rmtree(fs_path) 74 os.makedirs(fs_path, exist_ok=exist_ok) 75 self.replace_items[r'${fs_dir}'] = fs_path 76 77 for fs_dir in self.fs_cfg.get('fs_dirs', []): 78 source_dir = fs_dir.get('source_dir', '') 79 target_dir = fs_dir.get('target_dir', '') 80 if target_dir == '': 81 continue 82 83 source_path = self.fs_dirs_replace(source_dir, 84 self.config.out_path) 85 target_path = self.fs_dirs_replace(target_dir, fs_path) 86 87 if source_dir == '' or not os.path.exists(source_path): 88 os.makedirs(target_path, exist_ok=True) 89 target_mode_tuple = (target_path, fs_dir.get('dir_mode', 755)) 90 self.chmod_dirs.append(target_mode_tuple) 91 continue 92 93 self.copy_files(source_path, target_path, fs_dir) 94 95 def fs_dirs_replace(self, path, default_path): 96 source_path, is_changed = self.replace(path) 97 if not is_changed: 98 source_path = os.path.join(default_path, path) 99 return source_path 100 101 def copy_files(self, spath, tpath, fs_dir): 102 ignore_files = fs_dir.get('ignore_files', []) 103 dir_mode = fs_dir.get('dir_mode', 755) 104 file_mode = fs_dir.get('file_mode', 555) 105 106 def copy_file_process(source_path, target_path): 107 if not os.path.isdir(target_path): 108 os.makedirs(target_path, exist_ok=True) 109 self.chmod_dirs.append((target_path, dir_mode)) 110 tfile = os.path.join(target_path, os.path.basename(source_path)) 111 try: 112 shutil.copy(sfile, tfile, follow_symlinks=False) 113 self.chmod_dirs.append((tfile, file_mode)) 114 except FileExistsError: 115 LogUtil.hb_warning(f'Target file: {tfile} already exists!') 116 117 if os.path.isfile(spath): 118 sfile = spath 119 copy_file_process(spath, tpath) 120 return 121 122 for srelpath, sfile in self.list_all_files(spath, ignore_files): 123 tdirname = srelpath.replace(spath, tpath) 124 copy_file_process(sfile, tdirname) 125 126 @classmethod 127 def chmod(cls, file, mode): 128 mode = int(str(mode), base=8) 129 if os.path.exists(file): 130 os.chmod(file, mode) 131 132 @classmethod 133 def filter(cls, files, ignore_list): 134 if ignore_list is None or not len(ignore_list): 135 return files 136 filter_files = [] 137 for file in files: 138 flag = True 139 for ignore in ignore_list: 140 if file.startswith(ignore) or file.endswith(ignore): 141 flag = False 142 break 143 if flag: 144 filter_files.append(file) 145 return filter_files 146 147 def list_all_files(self, path, ignore_list=None): 148 for relpath, _, files in os.walk(path): 149 files = self.filter(files, ignore_list) 150 for file in files: 151 full_path = os.path.join(path, relpath, file) 152 if os.path.isfile(full_path): 153 yield relpath, full_path 154 155 def replace(self, raw_str): 156 old_str = raw_str 157 for old, new in self.replace_items.items(): 158 raw_str = raw_str.replace(old, new) 159 return raw_str, old_str != raw_str 160 161 def fs_link(self): 162 fs_symlink = self.fs_cfg.get('fs_symlink', []) 163 for symlink in fs_symlink: 164 source, _ = self.replace(symlink.get('source', '')) 165 link_name, _ = self.replace(symlink.get('link_name', '')) 166 if os.path.exists(link_name): 167 os.remove(link_name) 168 os.symlink(source, link_name) 169 170 def fs_filemode(self): 171 fs_filemode = self.fs_cfg.get('fs_filemode', []) 172 for filestat in fs_filemode: 173 file_dir = os.path.join(self.replace_items[r'${fs_dir}'], 174 filestat.get('file_dir', '')) 175 file_mode = filestat.get('file_mode', 0) 176 if os.path.exists(file_dir) and file_mode > 0: 177 self.chmod_dirs.append((file_dir, file_mode)) 178 179 for file_dir, file_mode in self.chmod_dirs: 180 self.chmod(file_dir, file_mode) 181 182 def fs_make_cmd(self): 183 fs_make_cmd = self.fs_cfg.get('fs_make_cmd', []) 184 log_path = self.config.log_path 185 186 for cmd in fs_make_cmd: 187 cmd, _ = self.replace(cmd) 188 cmd = cmd.split(' ') 189 SystemUtil.exec_command(cmd, log_path=log_path) 190 191 def fs_tear_down(self): 192 while len(self.chmod_dirs): 193 tfile = self.chmod_dirs.pop()[0] 194 195 if os.path.isfile(tfile): 196 self.chmod(tfile, 555) 197 elif os.path.isdir(tfile): 198 self.chmod(tfile, 755) 199 200 def fs_attr_process(self, fs_cfg): 201 fs_attr = fs_cfg.get('fs_attr', {}) 202 for attr_key, attr_value in fs_attr.items(): 203 if attr_key in self.config.fs_attr: 204 for target_key, target_value in attr_value.items(): 205 if target_key in fs_cfg: 206 fs_cfg[target_key] += target_value 207 else: 208 fs_cfg[target_key] = target_value 209 210 return fs_cfg 211 212 def fs_make(self, cmd_args): 213 fs_cfg_path = os.path.join(self.config.product_path, 'fs.yml') 214 if not os.path.isfile(fs_cfg_path): 215 LogUtil.hb_info(f'{fs_cfg_path} not found, stop packing fs. ' 216 'If the product does not need to be packaged, ignore it.') 217 return 218 if self.config.fs_attr is None: 219 LogUtil.hb_info('component compiling, no need to pack fs') 220 return 221 222 fs_cfg_list = IoUtil.read_yaml_file(fs_cfg_path) 223 for fs_cfg in fs_cfg_list: 224 self.fs_cfg = self.fs_attr_process(fs_cfg) 225 if self.fs_cfg.get('fs_dir_name', None) is None: 226 continue 227 228 for fs_process_func in self.packing_process: 229 fs_process_func() 230 231 232if __name__ == "__main__": 233 parser = argparse.ArgumentParser() 234 parser.add_argument('--product', required=True) 235 parser.add_argument('--root-path', required=True) 236 parser.add_argument('--out-path', required=True) 237 parser.add_argument('--log-path', required=True) 238 parser.add_argument('--product-path', required=True) 239 args = parser.parse_args() 240 241 packer = Packer(args) 242 243 packer.replace_items = { 244 r'${product_name}': args.product, 245 r'${root_path}': args.root_path, 246 r'${out_path}': args.out_path, 247 r'${log_path}': args.log_path, 248 r'${product_path}': args.product_path 249 } 250 packer.fs_make('') 251