• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4#
5# Copyright (c) 2020 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from collections import namedtuple
20import os
21import shutil
22
23from hb_internal.common.utils import exec_command
24from hb_internal.common.utils import makedirs
25from hb_internal.common.utils import read_yaml_file
26from hb_internal.common.utils import hb_info
27from hb_internal.common.utils import hb_warning
28from hb_internal.common.config import Config
29
30
31class Packer():
32    def __init__(self) -> None:
33        self.config = Config()
34        self.replace_items = {
35            r'${product_name}': self.config.product,
36            r'${root_path}': self.config.root_path,
37            r'${out_path}': self.config.out_path
38        }
39        self.packing_process = [
40            self.mv_usr_libs, self.create_fs_dirs, self.fs_link,
41            self.fs_filemode, self.fs_make_cmd, self.fs_tear_down
42        ]
43        self.fs_cfg = None
44        self.chmod_dirs = []
45
46    def mv_usr_libs(self):
47        src_path = self.config.out_path
48        libs = [lib for lib in os.listdir(src_path) if self.is_lib(lib)]
49        target_path = os.path.join(src_path, 'usr', 'lib')
50        makedirs(target_path, exist_ok=True)
51
52        for lib in libs:
53            source_file = os.path.join(src_path, lib)
54            target_file = os.path.join(target_path, lib)
55            shutil.move(source_file, target_file)
56
57    @classmethod
58    def is_lib(cls, lib):
59        return lib.startswith('lib') and lib.endswith('.so')
60
61    @classmethod
62    def is_incr(cls, fs_incr):
63        exist_ok = False if fs_incr is None else True
64        with_rm = True if fs_incr is None else False
65        return exist_ok, with_rm
66
67    def create_fs_dirs(self):
68        fs_path = os.path.join(self.config.out_path,
69                               self.fs_cfg.get('fs_dir_name', 'rootfs'))
70        exist_ok, with_rm = self.is_incr(self.fs_cfg.get('fs_incr', None))
71
72        makedirs(fs_path, exist_ok=exist_ok, with_rm=with_rm)
73        self.replace_items[r'${fs_dir}'] = fs_path
74
75        for fs_dir in self.fs_cfg.get('fs_dirs', []):
76            source_dir = fs_dir.get('source_dir', '')
77            target_dir = fs_dir.get('target_dir', '')
78            if target_dir == '':
79                continue
80
81            source_path = self.fs_dirs_replace(source_dir,
82                                               self.config.out_path)
83            target_path = self.fs_dirs_replace(target_dir, fs_path)
84
85            if source_dir == '' or not os.path.exists(source_path):
86                makedirs(target_path)
87                target_mode_tuple = (target_path, fs_dir.get('dir_mode', 755))
88                self.chmod_dirs.append(target_mode_tuple)
89                continue
90
91            self.copy_files(source_path, target_path, fs_dir)
92
93    def fs_dirs_replace(self, path, default_path):
94        source_path, is_changed = self.replace(path)
95        if not is_changed:
96            source_path = os.path.join(default_path, path)
97        return source_path
98
99    def copy_files(self, spath, tpath, fs_dir):
100        ignore_files = fs_dir.get('ignore_files', [])
101        dir_mode = fs_dir.get('dir_mode', 755)
102        file_mode = fs_dir.get('file_mode', 555)
103
104        def copy_file_process(source_path, target_path):
105            if not os.path.isdir(target_path):
106                makedirs(target_path)
107                self.chmod_dirs.append((target_path, dir_mode))
108            tfile = os.path.join(target_path, os.path.basename(source_path))
109            try:
110                shutil.copy(sfile, tfile, follow_symlinks=False)
111                self.chmod_dirs.append((tfile, file_mode))
112            except FileExistsError:
113                hb_warning(f'Target file: {tfile} already exists!')
114
115        if os.path.isfile(spath):
116            sfile = spath
117            copy_file_process(spath, tpath)
118            return
119
120        for srelpath, sfile in self.list_all_files(spath, ignore_files):
121            tdirname = srelpath.replace(spath, tpath)
122            copy_file_process(sfile, tdirname)
123
124    @classmethod
125    def chmod(cls, file, mode):
126        mode = int(str(mode), base=8)
127        if os.path.exists(file):
128            os.chmod(file, mode)
129
130    @classmethod
131    def filter(cls, files, ignore_list):
132        if ignore_list is None or not len(ignore_list):
133            return files
134        filter_files = []
135        for file in files:
136            flag = True
137            for ignore in ignore_list:
138                if file.startswith(ignore) or file.endswith(ignore):
139                    flag = False
140                    break
141            if flag:
142                filter_files.append(file)
143        return filter_files
144
145    def list_all_files(self, path, ignore_list=None):
146        for relpath, _, files in os.walk(path):
147            files = self.filter(files, ignore_list)
148            for file in files:
149                full_path = os.path.join(path, relpath, file)
150                if os.path.isfile(full_path):
151                    yield relpath, full_path
152
153    def replace(self, raw_str):
154        old_str = raw_str
155        for old, new in self.replace_items.items():
156            raw_str = raw_str.replace(old, new)
157        return raw_str, old_str != raw_str
158
159    def fs_link(self):
160        fs_symlink = self.fs_cfg.get('fs_symlink', [])
161        for symlink in fs_symlink:
162            source, _ = self.replace(symlink.get('source', ''))
163            link_name, _ = self.replace(symlink.get('link_name', ''))
164            if os.path.exists(link_name):
165                os.remove(link_name)
166            os.symlink(source, link_name)
167
168    def fs_filemode(self):
169        fs_filemode = self.fs_cfg.get('fs_filemode', [])
170        for filestat in fs_filemode:
171            file_dir = os.path.join(self.replace_items[r'${fs_dir}'],
172                                    filestat.get('file_dir', ''))
173            file_mode = filestat.get('file_mode', 0)
174            if os.path.exists(file_dir) and file_mode > 0:
175                self.chmod_dirs.append((file_dir, file_mode))
176
177        for file_dir, file_mode in self.chmod_dirs:
178            self.chmod(file_dir, file_mode)
179
180    def fs_make_cmd(self):
181        fs_make_cmd = self.fs_cfg.get('fs_make_cmd', [])
182        log_path = self.config.log_path
183
184        for cmd in fs_make_cmd:
185            cmd, _ = self.replace(cmd)
186            cmd = cmd.split(' ')
187            exec_command(cmd, log_path=log_path)
188
189    def fs_tear_down(self):
190        while len(self.chmod_dirs):
191            tfile = self.chmod_dirs.pop()[0]
192
193            if os.path.isfile(tfile):
194                self.chmod(tfile, 555)
195            elif os.path.isdir(tfile):
196                self.chmod(tfile, 755)
197
198    def fs_attr_process(self, fs_cfg):
199        fs_attr = fs_cfg.get('fs_attr', {})
200        for attr_key, attr_value in fs_attr.items():
201            if attr_key in self.config.fs_attr:
202                for target_key, target_value in attr_value.items():
203                    if target_key in fs_cfg:
204                        fs_cfg[target_key] += target_value
205                    else:
206                        fs_cfg[target_key] = target_value
207
208        return fs_cfg
209
210    def fs_make(self, cmd_args):
211        fs_cfg_path = os.path.join(self.config.product_path, 'fs.yml')
212        if not os.path.isfile(fs_cfg_path):
213            hb_info(f'{fs_cfg_path} not found, stop packing fs. '
214                    'If the product does not need to be packaged, ignore it.')
215            return
216        if self.config.fs_attr is None:
217            hb_info('component compiling, no need to pack fs')
218            return
219
220        fs_cfg_list = read_yaml_file(fs_cfg_path)
221        for fs_cfg in fs_cfg_list:
222            self.fs_cfg = self.fs_attr_process(fs_cfg)
223            if self.fs_cfg.get('fs_dir_name', None) is None:
224                continue
225
226            for fs_process_func in self.packing_process:
227                fs_process_func()
228
229if __name__ == "__main__":
230    packer = Packer()
231    ConfigTest = namedtuple(
232        'config',
233        ['product', 'root_path', 'out_path', 'log_path', 'product_path', 'fs_attr'])
234    packer.config = ConfigTest('', '', '', '', set())
235    packer.config.fs_attr.add('dmverity_enable')
236    packer.replace_items = {
237        r'${product_name}': packer.config.product,
238        r'${root_path}': packer.config.root_path,
239        r'${out_path}': packer.config.out_path
240    }
241    packer.fs_make('')
242