• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19
20import os
21import sys
22import shutil
23import argparse
24sys.path.append(os.path.dirname(os.path.dirname(
25    os.path.dirname(os.path.abspath(__file__)))))
26from hb.resources.config import Config
27from hb.util.log_util import LogUtil
28from hb.util.system_util import SystemUtil
29from hb.util.io_util import IoUtil
30
31
32class Packer():
33    def __init__(self, packer_args) -> None:
34        self.config = Config()
35        self.replace_items = {
36            r'${product_name}': self.config.product,
37            r'${root_path}': self.config.root_path,
38            r'${out_path}': self.config.out_path
39        }
40        self.packing_process = [
41            self.mv_usr_libs, self.create_fs_dirs, self.fs_link,
42            self.fs_filemode, self.fs_make_cmd, self.fs_tear_down
43        ]
44        self.fs_cfg = None
45        self.chmod_dirs = []
46
47    def mv_usr_libs(self):
48        src_path = self.config.out_path
49        libs = [lib for lib in os.listdir(src_path) if self.is_lib(lib)]
50        target_path = os.path.join(src_path, 'usr', 'lib')
51        os.makedirs(target_path, exist_ok=True)
52
53        for lib in libs:
54            source_file = os.path.join(src_path, lib)
55            target_file = os.path.join(target_path, lib)
56            shutil.move(source_file, target_file)
57
58    @classmethod
59    def is_lib(cls, lib):
60        return lib.startswith('lib') and lib.endswith('.so')
61
62    @classmethod
63    def is_incr(cls, fs_incr):
64        exist_ok = False if fs_incr is None else True
65        with_rm = True if fs_incr is None else False
66        return exist_ok, with_rm
67
68    def create_fs_dirs(self):
69        fs_path = os.path.join(self.config.out_path,
70                               self.fs_cfg.get('fs_dir_name', 'rootfs'))
71        exist_ok, with_rm = self.is_incr(self.fs_cfg.get('fs_incr', None))
72        if with_rm and os.path.exists(fs_path):
73            shutil.rmtree(fs_path)
74        os.makedirs(fs_path, exist_ok=exist_ok)
75        self.replace_items[r'${fs_dir}'] = fs_path
76
77        for fs_dir in self.fs_cfg.get('fs_dirs', []):
78            source_dir = fs_dir.get('source_dir', '')
79            target_dir = fs_dir.get('target_dir', '')
80            if target_dir == '':
81                continue
82
83            source_path = self.fs_dirs_replace(source_dir,
84                                               self.config.out_path)
85            target_path = self.fs_dirs_replace(target_dir, fs_path)
86
87            if source_dir == '' or not os.path.exists(source_path):
88                os.makedirs(target_path, exist_ok=True)
89                target_mode_tuple = (target_path, fs_dir.get('dir_mode', 755))
90                self.chmod_dirs.append(target_mode_tuple)
91                continue
92
93            self.copy_files(source_path, target_path, fs_dir)
94
95    def fs_dirs_replace(self, path, default_path):
96        source_path, is_changed = self.replace(path)
97        if not is_changed:
98            source_path = os.path.join(default_path, path)
99        return source_path
100
101    def copy_files(self, spath, tpath, fs_dir):
102        ignore_files = fs_dir.get('ignore_files', [])
103        dir_mode = fs_dir.get('dir_mode', 755)
104        file_mode = fs_dir.get('file_mode', 555)
105
106        def copy_file_process(source_path, target_path):
107            if not os.path.isdir(target_path):
108                os.makedirs(target_path, exist_ok=True)
109                self.chmod_dirs.append((target_path, dir_mode))
110            tfile = os.path.join(target_path, os.path.basename(source_path))
111            try:
112                shutil.copy(sfile, tfile, follow_symlinks=False)
113                self.chmod_dirs.append((tfile, file_mode))
114            except FileExistsError:
115                LogUtil.hb_warning(f'Target file: {tfile} already exists!')
116
117        if os.path.isfile(spath):
118            sfile = spath
119            copy_file_process(spath, tpath)
120            return
121
122        for srelpath, sfile in self.list_all_files(spath, ignore_files):
123            tdirname = srelpath.replace(spath, tpath)
124            copy_file_process(sfile, tdirname)
125
126    @classmethod
127    def chmod(cls, file, mode):
128        mode = int(str(mode), base=8)
129        if os.path.exists(file):
130            os.chmod(file, mode)
131
132    @classmethod
133    def filter(cls, files, ignore_list):
134        if ignore_list is None or not len(ignore_list):
135            return files
136        filter_files = []
137        for file in files:
138            flag = True
139            for ignore in ignore_list:
140                if file.startswith(ignore) or file.endswith(ignore):
141                    flag = False
142                    break
143            if flag:
144                filter_files.append(file)
145        return filter_files
146
147    def list_all_files(self, path, ignore_list=None):
148        for relpath, _, files in os.walk(path):
149            files = self.filter(files, ignore_list)
150            for file in files:
151                full_path = os.path.join(path, relpath, file)
152                if os.path.isfile(full_path):
153                    yield relpath, full_path
154
155    def replace(self, raw_str):
156        old_str = raw_str
157        for old, new in self.replace_items.items():
158            raw_str = raw_str.replace(old, new)
159        return raw_str, old_str != raw_str
160
161    def fs_link(self):
162        fs_symlink = self.fs_cfg.get('fs_symlink', [])
163        for symlink in fs_symlink:
164            source, _ = self.replace(symlink.get('source', ''))
165            link_name, _ = self.replace(symlink.get('link_name', ''))
166            if os.path.exists(link_name):
167                os.remove(link_name)
168            os.symlink(source, link_name)
169
170    def fs_filemode(self):
171        fs_filemode = self.fs_cfg.get('fs_filemode', [])
172        for filestat in fs_filemode:
173            file_dir = os.path.join(self.replace_items[r'${fs_dir}'],
174                                    filestat.get('file_dir', ''))
175            file_mode = filestat.get('file_mode', 0)
176            if os.path.exists(file_dir) and file_mode > 0:
177                self.chmod_dirs.append((file_dir, file_mode))
178
179        for file_dir, file_mode in self.chmod_dirs:
180            self.chmod(file_dir, file_mode)
181
182    def fs_make_cmd(self):
183        fs_make_cmd = self.fs_cfg.get('fs_make_cmd', [])
184        log_path = self.config.log_path
185
186        for cmd in fs_make_cmd:
187            cmd, _ = self.replace(cmd)
188            cmd = cmd.split(' ')
189            SystemUtil.exec_command(cmd, log_path=log_path)
190
191    def fs_tear_down(self):
192        while len(self.chmod_dirs):
193            tfile = self.chmod_dirs.pop()[0]
194
195            if os.path.isfile(tfile):
196                self.chmod(tfile, 555)
197            elif os.path.isdir(tfile):
198                self.chmod(tfile, 755)
199
200    def fs_attr_process(self, fs_cfg):
201        fs_attr = fs_cfg.get('fs_attr', {})
202        for attr_key, attr_value in fs_attr.items():
203            if attr_key in self.config.fs_attr:
204                for target_key, target_value in attr_value.items():
205                    if target_key in fs_cfg:
206                        fs_cfg[target_key] += target_value
207                    else:
208                        fs_cfg[target_key] = target_value
209
210        return fs_cfg
211
212    def fs_make(self, cmd_args):
213        fs_cfg_path = os.path.join(self.config.product_path, 'fs.yml')
214        if not os.path.isfile(fs_cfg_path):
215            LogUtil.hb_info(f'{fs_cfg_path} not found, stop packing fs. '
216                          'If the product does not need to be packaged, ignore it.')
217            return
218        if self.config.fs_attr is None:
219            LogUtil.hb_info('component compiling, no need to pack fs')
220            return
221
222        fs_cfg_list = IoUtil.read_yaml_file(fs_cfg_path)
223        for fs_cfg in fs_cfg_list:
224            self.fs_cfg = self.fs_attr_process(fs_cfg)
225            if self.fs_cfg.get('fs_dir_name', None) is None:
226                continue
227
228            for fs_process_func in self.packing_process:
229                fs_process_func()
230
231
232if __name__ == "__main__":
233    parser = argparse.ArgumentParser()
234    parser.add_argument('--product', required=True)
235    parser.add_argument('--root-path', required=True)
236    parser.add_argument('--out-path', required=True)
237    parser.add_argument('--log-path', required=True)
238    parser.add_argument('--product-path', required=True)
239    args = parser.parse_args()
240
241    packer = Packer(args)
242
243    packer.replace_items = {
244        r'${product_name}': args.product,
245        r'${root_path}': args.root_path,
246        r'${out_path}': args.out_path,
247        r'${log_path}': args.log_path,
248        r'${product_path}': args.product_path
249    }
250    packer.fs_make('')
251