1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3 4# 5# Copyright (c) 2020 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19from collections import namedtuple 20import os 21import shutil 22 23from hb_internal.common.utils import exec_command 24from hb_internal.common.utils import makedirs 25from hb_internal.common.utils import read_yaml_file 26from hb_internal.common.utils import hb_info 27from hb_internal.common.utils import hb_warning 28from hb_internal.common.config import Config 29 30 31class Packer(): 32 def __init__(self) -> None: 33 self.config = Config() 34 self.replace_items = { 35 r'${product_name}': self.config.product, 36 r'${root_path}': self.config.root_path, 37 r'${out_path}': self.config.out_path 38 } 39 self.packing_process = [ 40 self.mv_usr_libs, self.create_fs_dirs, self.fs_link, 41 self.fs_filemode, self.fs_make_cmd, self.fs_tear_down 42 ] 43 self.fs_cfg = None 44 self.chmod_dirs = [] 45 46 def mv_usr_libs(self): 47 src_path = self.config.out_path 48 libs = [lib for lib in os.listdir(src_path) if self.is_lib(lib)] 49 target_path = os.path.join(src_path, 'usr', 'lib') 50 makedirs(target_path, exist_ok=True) 51 52 for lib in libs: 53 source_file = os.path.join(src_path, lib) 54 target_file = os.path.join(target_path, lib) 55 shutil.move(source_file, target_file) 56 57 @classmethod 58 def is_lib(cls, lib): 59 return lib.startswith('lib') and lib.endswith('.so') 60 61 @classmethod 62 def is_incr(cls, fs_incr): 63 exist_ok = False if fs_incr is None else True 64 with_rm = True if fs_incr is None else False 65 return exist_ok, with_rm 66 67 def create_fs_dirs(self): 68 fs_path = os.path.join(self.config.out_path, 69 self.fs_cfg.get('fs_dir_name', 'rootfs')) 70 exist_ok, with_rm = self.is_incr(self.fs_cfg.get('fs_incr', None)) 71 72 makedirs(fs_path, exist_ok=exist_ok, with_rm=with_rm) 73 self.replace_items[r'${fs_dir}'] = fs_path 74 75 for fs_dir in self.fs_cfg.get('fs_dirs', []): 76 source_dir = fs_dir.get('source_dir', '') 77 target_dir = fs_dir.get('target_dir', '') 78 if target_dir == '': 79 continue 80 81 source_path = self.fs_dirs_replace(source_dir, 82 self.config.out_path) 83 target_path = self.fs_dirs_replace(target_dir, fs_path) 84 85 if source_dir == '' or not os.path.exists(source_path): 86 makedirs(target_path) 87 target_mode_tuple = (target_path, fs_dir.get('dir_mode', 755)) 88 self.chmod_dirs.append(target_mode_tuple) 89 continue 90 91 self.copy_files(source_path, target_path, fs_dir) 92 93 def fs_dirs_replace(self, path, default_path): 94 source_path, is_changed = self.replace(path) 95 if not is_changed: 96 source_path = os.path.join(default_path, path) 97 return source_path 98 99 def copy_files(self, spath, tpath, fs_dir): 100 ignore_files = fs_dir.get('ignore_files', []) 101 dir_mode = fs_dir.get('dir_mode', 755) 102 file_mode = fs_dir.get('file_mode', 555) 103 104 def copy_file_process(source_path, target_path): 105 if not os.path.isdir(target_path): 106 makedirs(target_path) 107 self.chmod_dirs.append((target_path, dir_mode)) 108 tfile = os.path.join(target_path, os.path.basename(source_path)) 109 try: 110 shutil.copy(sfile, tfile, follow_symlinks=False) 111 self.chmod_dirs.append((tfile, file_mode)) 112 except FileExistsError: 113 hb_warning(f'Target file: {tfile} already exists!') 114 115 if os.path.isfile(spath): 116 sfile = spath 117 copy_file_process(spath, tpath) 118 return 119 120 for srelpath, sfile in self.list_all_files(spath, ignore_files): 121 tdirname = srelpath.replace(spath, tpath) 122 copy_file_process(sfile, tdirname) 123 124 @classmethod 125 def chmod(cls, file, mode): 126 mode = int(str(mode), base=8) 127 if os.path.exists(file): 128 os.chmod(file, mode) 129 130 @classmethod 131 def filter(cls, files, ignore_list): 132 if ignore_list is None or not len(ignore_list): 133 return files 134 filter_files = [] 135 for file in files: 136 flag = True 137 for ignore in ignore_list: 138 if file.startswith(ignore) or file.endswith(ignore): 139 flag = False 140 break 141 if flag: 142 filter_files.append(file) 143 return filter_files 144 145 def list_all_files(self, path, ignore_list=None): 146 for relpath, _, files in os.walk(path): 147 files = self.filter(files, ignore_list) 148 for file in files: 149 full_path = os.path.join(path, relpath, file) 150 if os.path.isfile(full_path): 151 yield relpath, full_path 152 153 def replace(self, raw_str): 154 old_str = raw_str 155 for old, new in self.replace_items.items(): 156 raw_str = raw_str.replace(old, new) 157 return raw_str, old_str != raw_str 158 159 def fs_link(self): 160 fs_symlink = self.fs_cfg.get('fs_symlink', []) 161 for symlink in fs_symlink: 162 source, _ = self.replace(symlink.get('source', '')) 163 link_name, _ = self.replace(symlink.get('link_name', '')) 164 if os.path.exists(link_name): 165 os.remove(link_name) 166 os.symlink(source, link_name) 167 168 def fs_filemode(self): 169 fs_filemode = self.fs_cfg.get('fs_filemode', []) 170 for filestat in fs_filemode: 171 file_dir = os.path.join(self.replace_items[r'${fs_dir}'], 172 filestat.get('file_dir', '')) 173 file_mode = filestat.get('file_mode', 0) 174 if os.path.exists(file_dir) and file_mode > 0: 175 self.chmod_dirs.append((file_dir, file_mode)) 176 177 for file_dir, file_mode in self.chmod_dirs: 178 self.chmod(file_dir, file_mode) 179 180 def fs_make_cmd(self): 181 fs_make_cmd = self.fs_cfg.get('fs_make_cmd', []) 182 log_path = self.config.log_path 183 184 for cmd in fs_make_cmd: 185 cmd, _ = self.replace(cmd) 186 cmd = cmd.split(' ') 187 exec_command(cmd, log_path=log_path) 188 189 def fs_tear_down(self): 190 while len(self.chmod_dirs): 191 tfile = self.chmod_dirs.pop()[0] 192 193 if os.path.isfile(tfile): 194 self.chmod(tfile, 555) 195 elif os.path.isdir(tfile): 196 self.chmod(tfile, 755) 197 198 def fs_attr_process(self, fs_cfg): 199 fs_attr = fs_cfg.get('fs_attr', {}) 200 for attr_key, attr_value in fs_attr.items(): 201 if attr_key in self.config.fs_attr: 202 for target_key, target_value in attr_value.items(): 203 if target_key in fs_cfg: 204 fs_cfg[target_key] += target_value 205 else: 206 fs_cfg[target_key] = target_value 207 208 return fs_cfg 209 210 def fs_make(self, cmd_args): 211 fs_cfg_path = os.path.join(self.config.product_path, 'fs.yml') 212 if not os.path.isfile(fs_cfg_path): 213 hb_info(f'{fs_cfg_path} not found, stop packing fs. ' 214 'If the product does not need to be packaged, ignore it.') 215 return 216 if self.config.fs_attr is None: 217 hb_info('component compiling, no need to pack fs') 218 return 219 220 fs_cfg_list = read_yaml_file(fs_cfg_path) 221 for fs_cfg in fs_cfg_list: 222 self.fs_cfg = self.fs_attr_process(fs_cfg) 223 if self.fs_cfg.get('fs_dir_name', None) is None: 224 continue 225 226 for fs_process_func in self.packing_process: 227 fs_process_func() 228 229if __name__ == "__main__": 230 packer = Packer() 231 ConfigTest = namedtuple( 232 'config', 233 ['product', 'root_path', 'out_path', 'log_path', 'product_path', 'fs_attr']) 234 packer.config = ConfigTest('', '', '', '', set()) 235 packer.config.fs_attr.add('dmverity_enable') 236 packer.replace_items = { 237 r'${product_name}': packer.config.product, 238 r'${root_path}': packer.config.root_path, 239 r'${out_path}': packer.config.out_path 240 } 241 packer.fs_make('') 242