• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4#
5# Copyright (c) 2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18import os
19import re
20from collections import defaultdict
21
22from util.io_util import IoUtil
23from exceptions.ohos_exception import OHOSException
24from resources.config import Config
25from containers.status import throw_exception
26
27from helper.noInstance import NoInstance
28
29
30class ProductUtil(metaclass=NoInstance):
31
32    @staticmethod
33    def get_products():
34        config = Config()
35        # ext products configuration
36        _ext_scan_path = os.path.join(config.root_path,
37                                      'out/products_ext/vendor')
38        if os.path.exists(_ext_scan_path):
39            for company in os.listdir(_ext_scan_path):
40                company_path = os.path.join(_ext_scan_path, company)
41                if not os.path.isdir(company_path):
42                    continue
43
44                for product in os.listdir(company_path):
45                    p_config_path = os.path.join(company_path, product)
46                    config_path = os.path.join(p_config_path, 'config.json')
47
48                    if os.path.isfile(config_path):
49                        info = IoUtil.read_json_file(config_path)
50                        product_name = info.get('product_name')
51                        if info.get('product_path'):
52                            product_path = os.path.join(
53                                config.root_path, info.get('product_path'))
54                        else:
55                            product_path = p_config_path
56                        if product_name is not None:
57                            subsystem_config_overlay_path = os.path.join(product_path,
58                                'subsystem_config_overlay.json')
59                            if os.path.isfile(subsystem_config_overlay_path):
60                                yield {
61                                    'company': company,
62                                    "name": product_name,
63                                    'product_config_path': p_config_path,
64                                    'product_path': product_path,
65                                    'version': info.get('version', '3.0'),
66                                    'os_level': info.get('type', "mini"),
67                                    'build_out_path': info.get('build_out_path'),
68                                    'subsystem_config_json':
69                                    info.get('subsystem_config_json'),
70                                    'subsystem_config_overlay_json':
71                                    subsystem_config_overlay_path,
72                                    'config': config_path,
73                                    'component_type': info.get('component_type', '')
74                                }
75                            else:
76                                yield {
77                                    'company': company,
78                                    "name": product_name,
79                                    'product_config_path': p_config_path,
80                                    'product_path': product_path,
81                                    'version': info.get('version', '3.0'),
82                                    'os_level': info.get('type', "mini"),
83                                    'build_out_path': info.get('build_out_path'),
84                                    'subsystem_config_json':
85                                    info.get('subsystem_config_json'),
86                                    'config': config_path,
87                                    'component_type': info.get('component_type', '')
88                                }
89        if config.vendor_path != '':
90            for company in os.listdir(config.vendor_path):
91                company_path = os.path.join(config.vendor_path, company)
92                if not os.path.isdir(company_path):
93                    continue
94
95                for product in os.listdir(company_path):
96                    product_path = os.path.join(company_path, product)
97                    config_path = os.path.join(product_path, 'config.json')
98
99                    if os.path.isfile(config_path):
100                        info = IoUtil.read_json_file(config_path)
101                        product_name = info.get('product_name')
102                        if product_name is not None:
103                            yield {
104                                'company': company,
105                                "name": product_name,
106                                'product_config_path': product_path,
107                                'product_path': product_path,
108                                'version': info.get('version', '3.0'),
109                                'os_level': info.get('type', "mini"),
110                                'config': config_path,
111                                'component_type': info.get('component_type', '')
112                            }
113        bip_path = config.built_in_product_path
114        for item in os.listdir(bip_path):
115            if item[0] in ".":
116                continue
117            else:
118                product_name = item[0:-len('.json')
119                                    ] if item.endswith('.json') else item
120                config_path = os.path.join(bip_path, item)
121                info = IoUtil.read_json_file(config_path)
122                yield {
123                    'company': 'built-in',
124                    "name": product_name,
125                    'product_config_path': bip_path,
126                    'product_path': bip_path,
127                    'version': info.get('version', '2.0'),
128                    'os_level': info.get('type', 'standard'),
129                    'config': config_path,
130                    'component_type': info.get('component_type', '')
131                }
132
133        bipl_path = config.built_in_product_path_for_llvm
134        if os.path.isdir(bipl_path):
135            for item in os.listdir(bipl_path):
136                if item[0] in ".":
137                    continue
138                else:
139                    product_name = item[0:-len('.json')
140                                        ] if item.endswith('.json') else item
141                    config_path = os.path.join(bipl_path, item)
142                    info = IoUtil.read_json_file(config_path)
143                    yield {
144                        'company': 'built-in',
145                        "name": product_name,
146                        'product_config_path': bipl_path,
147                        'product_path': bipl_path,
148                        'version': info.get('version', '2.0'),
149                        'os_level': info.get('type', 'standard'),
150                        'config': config_path,
151                        'component_type': info.get('component_type', '')
152                    }
153
154    @staticmethod
155    @throw_exception
156    def get_device_info(product_json: str):
157        info = IoUtil.read_json_file(product_json)
158        config = Config()
159        version = info.get('version', '3.0')
160
161        if version == '3.0':
162            device_company = info.get('device_company')
163            board = info.get('board')
164            _board_path = info.get('board_path')
165            if _board_path and os.path.exists(
166                    os.path.join(config.root_path, _board_path)):
167                board_path = os.path.join(config.root_path, _board_path)
168            else:
169                board_path = os.path.join(config.root_path, 'device',
170                                          device_company, board)
171                # board and soc decoupling feature will add boards
172                # directory path here.
173                if not os.path.exists(board_path):
174                    board_path = os.path.join(config.root_path, 'device',
175                                              'board', device_company, board)
176            board_config_path = None
177            if info.get('board_config_path'):
178                board_config_path = os.path.join(config.root_path,
179                                                 info.get('board_config_path'))
180
181            return {
182                'board': info.get('board'),
183                'kernel': info.get('kernel_type'),
184                'kernel_version': info.get('kernel_version'),
185                'company': info.get('device_company'),
186                'board_path': board_path,
187                'board_config_path': board_config_path,
188                'target_cpu': info.get('target_cpu'),
189                'target_os': info.get('target_os'),
190                'support_cpu': info.get('support_cpu'),
191            }
192        else:
193            raise OHOSException(f'wrong version number in {product_json}')
194
195    @staticmethod
196    @throw_exception
197    def get_all_components(product_json: str):
198        if not os.path.isfile(product_json):
199            raise OHOSException(f'features {product_json} not found')
200
201        config = Config()
202        # Get all inherit files
203        files = [os.path.join(config.root_path, file) for file in IoUtil.read_json_file(
204            product_json).get('inherit', [])]
205        # Add the product config file to last with highest priority
206        files.append(product_json)
207
208        # Read all parts in order
209        all_parts = {}
210        for _file in files:
211            if not os.path.isfile(_file):
212                continue
213            _info = IoUtil.read_json_file(_file)
214            parts = _info.get('parts')
215            if parts:
216                all_parts.update(parts)
217            else:
218                # v3 config files
219                all_parts.update(ProductUtil.get_vendor_parts_list(_info))
220
221        return all_parts
222
223    @staticmethod
224    @throw_exception
225    def get_features(product_json: str):
226        if not os.path.isfile(product_json):
227            raise OHOSException(f'features {product_json} not found')
228
229        config = Config()
230        # Get all inherit files
231        files = [os.path.join(config.root_path, file) for file in IoUtil.read_json_file(
232            product_json).get('inherit', [])]
233        # Add the product config file to last with highest priority
234        files.append(product_json)
235
236        # Read all parts in order
237        all_parts = {}
238        for _file in files:
239            if not os.path.isfile(_file):
240                continue
241            _info = IoUtil.read_json_file(_file)
242            parts = _info.get('parts')
243            if parts:
244                all_parts.update(parts)
245            else:
246                # v3 config files
247                all_parts.update(ProductUtil.get_vendor_parts_list(_info))
248
249        # Get all features
250        features_list = []
251        for part, value in all_parts.items():
252            if "features" not in value:
253                continue
254            for key, val in value["features"].items():
255                _item = ''
256                if isinstance(val, bool):
257                    _item = f'{key}={str(val).lower()}'
258                elif isinstance(val, int):
259                    _item = f'{key}={val}'
260                elif isinstance(val, str):
261                    _item = f'{key}="{val}"'
262                else:
263                    raise Exception(
264                        "part feature '{key}:{val}' type not support.")
265                features_list.append(_item)
266        return features_list
267
268    @staticmethod
269    @throw_exception
270    def get_features_dict(product_json: str):
271        all_parts = ProductUtil.get_all_components(product_json)
272        features_dict = {}
273        for part, value in all_parts.items():
274            if "features" not in value:
275                continue
276            for key, val in value["features"].items():
277                if type(val) in [bool, int, str]:
278                    features_dict[key] = val
279                else:
280                    raise Exception(
281                        "part feature '{key}:{val}' type not support.")
282        return features_dict
283
284    @staticmethod
285    @throw_exception
286    def get_components(product_json: str, subsystems: str):
287        if not os.path.isfile(product_json):
288            raise OHOSException(f'{product_json} not found')
289
290        components_dict = defaultdict(list)
291        product_data = IoUtil.read_json_file(product_json)
292        for subsystem in product_data.get('subsystems', []):
293            sname = subsystem.get('subsystem', '')
294            if not len(subsystems) or sname in subsystems:
295                components_dict[sname] += [
296                    comp['component']
297                    for comp in subsystem.get('components', [])
298                ]
299
300        return components_dict, product_data.get('board', ''),\
301            product_data.get('kernel_type', '')
302
303    @staticmethod
304    @throw_exception
305    def get_product_info(product_name: str, company=None):
306        for product_info in ProductUtil.get_products():
307            cur_company = product_info['company']
308            cur_product = product_info['name']
309            if company:
310                if cur_company == company and cur_product == product_name:
311                    return product_info
312            else:
313                if cur_product == product_name:
314                    return product_info
315
316        raise OHOSException(f'product {product_name}@{company} not found')
317
318    @staticmethod
319    @throw_exception
320    def get_compiler(config_path: str):
321        config = os.path.join(config_path, 'config.gni')
322        if not os.path.isfile(config):
323            return ''
324        compiler_pattern = r'board_toolchain_type ?= ?"(\w+)"'
325        with open(config, 'rt', encoding='utf-8') as config_file:
326            data = config_file.read()
327        compiler_list = re.findall(compiler_pattern, data)
328        if not len(compiler_list):
329            raise OHOSException(f'board_toolchain_type is None in {config}')
330
331        return compiler_list[0]
332
333    @staticmethod
334    def get_vendor_parts_list(config: dict):
335        return _transform(config).get('parts')
336
337    @staticmethod
338    def has_component(product_name: str) -> bool:
339        pass
340
341
342def _transform(config: dict):
343    subsystems = config.get('subsystems')
344    if subsystems:
345        config.pop('subsystems')
346        parts = _from_ss_to_parts(subsystems)
347        config['parts'] = parts
348    return config
349
350
351def _from_ss_to_parts(subsystems: dict):
352    parts = dict()
353    for subsystem in subsystems:
354        ss_name = subsystem.get('subsystem')
355        components = subsystem.get('components')
356        if components:
357            for com in components:
358                com_name = com.get('component')
359                features = com.get('features')
360                syscap = com.get('syscap')
361                exclusions = com.get('exclusions')
362                if features:
363                    pairs = get_features(features)
364                    parts['{}:{}'.format(ss_name, com_name)] = pairs
365                else:
366                    parts['{}:{}'.format(ss_name, com_name)] = dict()
367                if syscap:
368                    pairs = get_syscap(syscap)
369                    parts.get('{}:{}'.format(ss_name, com_name)).update(pairs)
370                if exclusions:
371                    pairs = get_exclusion_modules(exclusions)
372                    parts.get('{}:{}'.format(ss_name, com_name)).update(pairs)
373                # Copy other key-values
374                for key, val in com.items():
375                    if key in ['component', 'features', 'syscap', 'exclusions']:
376                        continue
377                    parts.get('{}:{}'.format(ss_name, com_name)).update(key=val)
378    return parts
379
380
381def get_features(features: dict):
382    feats = {}
383    for feat in features:
384        if not feat:
385            continue
386        match = feat.index("=")
387        if match <= 0:
388            print("Warning: invalid feature [{}]".format(feat))
389            continue
390        key = feat[:match].strip()
391        val = feat[match + 1:].strip().strip('"')
392        if val == 'true':
393            feats[key] = True
394        elif val == 'false':
395            feats[key] = False
396        elif re.match(r'[0-9]+', val):
397            feats[key] = int(val)
398        else:
399            feats[key] = val.replace('\"', '"')
400
401    pairs = dict()
402    pairs['features'] = feats
403    return pairs
404
405
406def get_syscap(syscap: dict):
407    feats = {}
408    for feat in syscap:
409        if not feat:
410            continue
411        if '=' not in feat:
412            raise Exception("Error: invalid syscap [{}]".format(feat))
413        match = feat.index("=")
414        key = feat[:match].strip()
415        val = feat[match + 1:].strip().strip('"')
416        if val == 'true':
417            feats[key] = True
418        elif val == 'false':
419            feats[key] = False
420        elif re.match(r'[0-9]+', val):
421            feats[key] = int(val)
422        else:
423            feats[key] = val.replace('\"', '"')
424
425    pairs = dict()
426    pairs['syscap'] = feats
427    return pairs
428
429
430def get_exclusion_modules(exclusions: str):
431    pairs = dict()
432    pairs['exclusions'] = exclusions
433    return pairs
434