#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (c) 2024 Huawei Device Co., Ltd. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import subprocess import sys import stat import os import argparse import shutil import json import time import re import urllib.request def _get_args(): parser = argparse.ArgumentParser(add_help=True) parser.add_argument("-op", "--out_path", default=r"./", type=str, help="path of out.", ) parser.add_argument("-rp", "--root_path", default=r"./", type=str, help="path of root. default: ./", ) parser.add_argument("-cl", "--components_list", default="", type=str, help="components_list , " "pass in the components' name, separated by commas , " "example: A,B,C . " "default: none", ) parser.add_argument("-bt", "--build_type", default=0, type=int, help="build_type ,default: 0", ) parser.add_argument("-on", "--organization_name", default='ohos', type=str, help="organization_name ,default: '' ", ) parser.add_argument("-os", "--os_arg", default=r"linux", type=str, help="path of output file. default: linux", ) parser.add_argument("-ba", "--build_arch", default=r"x86", type=str, help="build_arch_arg. default: x86", ) parser.add_argument("-lt", "--local_test", default=0, type=int, help="local test ,default: not local , 0", ) args = parser.parse_args() return args def create_directories(paths): for path in paths: os.makedirs(path, exist_ok=True) def copy_files(src_dst_pairs): for src, dst in src_dst_pairs: shutil.copy2(src, dst) def generate_common_configs(): return """import("//build/ohos.gni") config("musl_common_configs") { visibility = [ ":*" ] include_dirs = [ "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__algorithm", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__bit", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__charconv", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__chrono", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__compare", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__concepts", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__debug_utils", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__concepts", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__filesystem", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__format", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__functional", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__fwd", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__ios", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__iterator", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__memory", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__numeric", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__random", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__ranges", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__string", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__support", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__thread", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__type_traits", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__utility", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/__variant", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/experimental", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/c++/v1/ext", "//prebuilts/clang/ohos/linux-x86_64/15.0.4/llvm/include/arm-linux-ohos/c++/v1", ] cflags_c = [ "-Wno-error=bitwise-op-parentheses", "-Wno-error=shift-op-parentheses", ] } """ def generate_soft_libc_musl_shared_configs(): return """ config("soft_libc_musl_shared_configs") { visibility = [ ":*" ] include_dirs = [ "innerapis/includes", ] } ohos_prebuilt_shared_library("soft_libc_musl_shared") { public_configs = [":soft_libc_musl_shared_configs",":musl_common_configs"] public_external_deps = [ ] source = "innerapis/libs/libc.so" part_name = "musl" subsystem_name = "thirdparty" } """ def generate_soft_libcrypt_configs(): return """ config("soft_libcrypt_configs") { visibility = [ ":*" ] include_dirs = [ "innerapis/includes", ] } ohos_prebuilt_static_library("soft_libcrypt") { public_configs = [":soft_libcrypt_configs",":musl_common_configs"] public_external_deps = [ ] source = "innerapis/libs/libcrypt.a" part_name = "musl" subsystem_name = "thirdparty" } """ def generate_soft_libdl_configs(): return """ config("soft_libdl_configs") { visibility = [ ":*" ] include_dirs = [ "innerapis/includes", ] } ohos_prebuilt_static_library("soft_libdl") { public_configs = [":soft_libdl_configs",":musl_common_configs"] public_external_deps = [ ] source = "innerapis/libs/libdl.a" part_name = "musl" subsystem_name = "thirdparty" } """ def generate_soft_libm_configs(): return """ config("soft_libm_configs") { visibility = [ ":*" ] include_dirs = [ "innerapis/includes", ] } ohos_prebuilt_static_library("soft_libm") { public_configs = [":soft_libm_configs",":musl_common_configs"] public_external_deps = [ ] source = "innerapis/libs/libm.a" part_name = "musl" subsystem_name = "thirdparty" } """ def generate_soft_libpthread_configs(): return """ config("soft_libpthread_configs") { visibility = [ ":*" ] include_dirs = [ "innerapis/includes", ] } ohos_prebuilt_static_library("soft_libpthread") { public_configs = [":soft_libpthread_configs",":musl_common_configs"] public_external_deps = [ ] source = "innerapis/libs/libpthread.a" part_name = "musl" subsystem_name = "thirdparty" } """ def generate_soft_libresolv_configs(): return """ config("soft_libresolv_configs") { visibility = [ ":*" ] include_dirs = [ "innerapis/includes", ] } ohos_prebuilt_static_library("soft_libresolv") { public_configs = [":soft_libresolv_configs",":musl_common_configs"] public_external_deps = [ ] source = "innerapis/libs/libresolv.a" part_name = "musl" subsystem_name = "thirdparty" } """ def generate_soft_librt_configs(): return """ config("soft_librt_configs") { visibility = [ ":*" ] include_dirs = [ "innerapis/includes", ] } ohos_prebuilt_static_library("soft_librt") { public_configs = [":soft_librt_configs",":musl_common_configs"] public_external_deps = [ ] source = "innerapis/libs/librt.a" part_name = "musl" subsystem_name = "thirdparty" } """ def generate_soft_libutil_configs(): return """ config("soft_libutil_configs") { visibility = [ ":*" ] include_dirs = [ "innerapis/includes", ] } ohos_prebuilt_static_library("soft_libutil") { public_configs = [":soft_libutil_configs",":musl_common_configs"] public_external_deps = [ ] source = "innerapis/libs/libutil.a" part_name = "musl" subsystem_name = "thirdparty" } """ def generate_soft_libxnet_configs(): return """ config("soft_libxnet_configs") { visibility = [ ":*" ] include_dirs = [ "innerapis/includes", ] } ohos_prebuilt_static_library("soft_libxnet") { public_configs = [":soft_libxnet_configs",":musl_common_configs"] public_external_deps = [ ] source = "innerapis/libs/libxnet.a" part_name = "musl" subsystem_name = "thirdparty" } """ def generate_group_copy_libs_block(): return """ group("copy_libs") { lib_files = [ "libc.a", "libc.so", "libcrypt.a", "libdl.a", "libm.a", "libpthread.a", "libresolv.a", "librt.a", "libutil.a", "libxnet.a", "crtn.o", "crti.o", "crt1.o", "rcrt1.o", "Scrt1.o", ] sources = [] outputs = [] deps = [] foreach(file, lib_files) { copy("copy_${file}") { sources += [ "innerapis/libs/${file}" ] outputs += [ "${target_out_dir}/usr/lib/arm-linux-ohos/${file}" ] } deps += [ ":copy_${file}" ] } } group("soft_shared_libs") { public_configs = [":musl_common_configs", ":soft_libxnet_configs"] deps = [ ":soft_libc_musl_shared", ":soft_libcrypt", ":soft_libdl", ":soft_libm", ":soft_libpthread", ":soft_libresolv", ":soft_librt", ":soft_libutil", ":soft_libxnet", ] } """ def generate_group_musl_headers_block(): return """ group("musl_headers") { public_deps = [ ":musl_common_configs", ] } """ def generate_gn_file_content(part_data): gn_content = [] gn_content.append(generate_common_configs()) gn_content.append(generate_soft_libc_musl_shared_configs()) gn_content.append(generate_soft_libcrypt_configs()) gn_content.append(generate_soft_libdl_configs()) gn_content.append(generate_soft_libm_configs()) gn_content.append(generate_soft_libpthread_configs()) gn_content.append(generate_soft_libresolv_configs()) gn_content.append(generate_soft_librt_configs()) gn_content.append(generate_soft_libutil_configs()) gn_content.append(generate_soft_libxnet_configs()) gn_content.append(generate_group_copy_libs_block()) gn_content.append(generate_group_musl_headers_block()) return '\n'.join(gn_content) def write_gn_file(gn_path, content): with open(gn_path, 'w') as gn_file: gn_file.write(content) def copy_musl_libs_includes(musl_obj_path, innerapi_target_path): for folder_name in ['include', 'lib']: src_folder_path = os.path.join(musl_obj_path, folder_name, 'arm-linux-ohos') dst_folder_path = os.path.join(innerapi_target_path, folder_name + 's') dst_folder_path_1 = os.path.join(innerapi_target_path, 'musl_headers', folder_name + 's') dst_folder_path_2 = os.path.join(innerapi_target_path, 'soft_libc_musl_static', folder_name + 's') dst_folder_path_3 = os.path.join(innerapi_target_path, 'soft_shared_libs', folder_name + 's') if os.path.exists(src_folder_path): shutil.copytree(src_folder_path, dst_folder_path, dirs_exist_ok=True) shutil.copytree(src_folder_path, dst_folder_path_1, dirs_exist_ok=True) shutil.copytree(src_folder_path, dst_folder_path_2, dirs_exist_ok=True) shutil.copytree(src_folder_path, dst_folder_path_3, dirs_exist_ok=True) def process_musl(part_data, parts_path_info, part_name, subsystem_name, components_json): musl_obj_path = os.path.join(part_data.get('out_path'), 'obj', 'third_party', 'musl', 'usr') musl_dst_path = os.path.join(part_data.get('out_path'), 'component_package', 'third_party', 'musl') musl_src_path = os.path.join(part_data.get('root_path'), 'third_party', 'musl') create_directories([musl_dst_path]) # Copy necessary files to the musl destination path files_to_copy = [ 'configure', 'dynamic.list', 'libc.map.txt', 'musl_config.gni' ] copy_files([(os.path.join(musl_src_path, file_name), os.path.join(musl_dst_path, file_name)) for file_name in files_to_copy]) # Generate and write the GN file gn_path = os.path.join(musl_dst_path, 'BUILD.gn') gn_content = generate_gn_file_content(part_data) write_gn_file(gn_path, gn_content) innerapi_target_path = os.path.join(musl_dst_path, 'innerapis') copy_musl_libs_includes(musl_obj_path, innerapi_target_path) modules = _parse_module_list(part_data) print('modules', modules) if len(modules) == 0: return _public_deps_list = [] # ... Additional logic for processing modules, copying docs, and finishing the component build ... _copy_required_docs(part_data, _public_deps_list) _finish_component_build(part_data) def _create_bundle_json(bundle_path, bundle_content): bundle = {} with open(bundle_path, "w", encoding="utf-8") as f1: json.dump(bundle_content, f1, indent=2) def _generate_rust_bundle_content(): bundle_content = { "name": "@ohos/rust", "description": "third party rust tools, provide multiply functions about compiler", "version": "3.1.0-snapshot", "license": "Apache License 2.0", "publishAs": "binary", "segment": {"destPath": "third_party/rust/crates"}, "dirs": {"./": ["*"]}, "scripts": {}, "component": { "name": "rust", "subsystem": "thirdparty", "syscap": [], "features": [], "adapted_system_type": ["standard"], "rom": "", "ram": "", "hisysevent_config": [], "deps": { "components": [], "third_party": [] }, "build": { "sub_component": [], "inner_api": [], "test": [] } }, "os": "linux", "buildArch": "x86" } return bundle_content def process_rust(part_data, parts_path_info, part_name, subsystem_name, components_json): rust_src_path = os.path.join(part_data.get('root_path'), 'third_party', 'rust', 'crates') dst_path = os.path.join(part_data.get("out_path"), "component_package", part_data.get("part_path")) copy_directory_contents(rust_src_path, dst_path) gn_path = os.path.join(dst_path, "bundle.json") bundle_content = _generate_rust_bundle_content() _create_bundle_json(gn_path, bundle_content) _copy_license(part_data) _copy_readme(part_data) def copy_directory_contents(src_path, dst_path): if not os.path.exists(dst_path): os.makedirs(dst_path) for item in os.listdir(src_path): src = os.path.join(src_path, item) dst = os.path.join(dst_path, item) if os.path.isdir(src): if os.path.exists(dst): shutil.rmtree(dst) shutil.copytree(src, dst) elif os.path.isfile(src): shutil.copy2(src, dst) def generate_developer_test_bundle_base_info(): return { "name": "@ohos/developer_test", "description": "developer_test", "version": "3.1.0-snapshot", "license": "Apache License 2.0", "publishAs": "binary", "segment": {"destPath": "test/testfwk/developer_test"}, "repository": "", "dirs": {"./": ["*"]}, "scripts": {}, "os": "linux", "buildArch": "x86" } def generate_developer_test_component_info(): return { "name": "developer_test", "subsystem": "testfwk", "syscap": [], "features": [], "adapted_system_type": ["mini", "small", "standard"], "rom": "0KB", "ram": "0KB", "deps": {} } def generate_developer_test_build_info(): return { "sub_component": [ "//test/testfwk/developer_test/examples/app_info:app_info", "//test/testfwk/developer_test/examples/detector:detector", "//test/testfwk/developer_test/examples/calculator:calculator", "//test/testfwk/developer_test/examples/calculator:calculator_static" ], "inner_kits": [ { "name": "//test/testfwk/developer_test/aw/cxx/distributed:distributedtest_lib", "header": { "header_base": [ "//test/testfwk/developer_test/aw/cxx/distributed/utils", "//test/testfwk/developer_test/aw/cxx/distributed" ], "header_files": [ "csv_transform_xml.h", "distributed.h", "distributed_agent.h", "distributed_cfg.h", "distributed_major.h" ] } }, { "name": "//test/testfwk/developer_test/aw/cxx/hwext:performance_test_static", "header": { "header_base": "//test/testfwk/developer_test/aw/cxx/hwext", "header_files": "perf.h" } } ], "test": [ "//test/testfwk/developer_test/examples/app_info/test:unittest", "//test/testfwk/developer_test/examples/calculator/test:unittest", "//test/testfwk/developer_test/examples/calculator/test:fuzztest", "//test/testfwk/developer_test/examples/calculator/test:benchmarktest", "//test/testfwk/developer_test/examples/detector/test:unittest", "//test/testfwk/developer_test/examples/sleep/test:performance", "//test/testfwk/developer_test/examples/distributedb/test:distributedtest", "//test/testfwk/developer_test/examples/stagetest/actsbundlemanagerstagetest:unittest" ] } def _generate_developer_test_bundle_content(): bundle_content = generate_developer_test_bundle_base_info() component_info = generate_developer_test_component_info() build_info = generate_developer_test_build_info() bundle_content["component"] = component_info component_info["build"] = build_info return bundle_content def process_developer_test(part_data, parts_path_info, part_name, subsystem_name, components_json): developer_test_src_path = os.path.join(part_data.get('root_path'), 'test', 'testfwk', 'developer_test') dst_path = os.path.join(part_data.get("out_path"), "component_package", part_data.get("part_path")) copy_directory_contents(developer_test_src_path, dst_path) gn_path = os.path.join(dst_path, "bundle.json") bundle_content = _generate_developer_test_bundle_content() _create_bundle_json(gn_path, bundle_content) _copy_license(part_data) _copy_readme(part_data) _finish_component_build(part_data) def write_hilog_gn(part_data, module): gn_path = os.path.join(part_data.get("out_path"), "component_package", part_data.get("part_path"), "innerapis", module, "BUILD.gn") if os.path.exists(gn_path): os.remove(gn_path) fd = os.open(gn_path, os.O_WRONLY | os.O_CREAT, mode=0o640) fp = os.fdopen(fd, 'w') fp.write("""import("//build/ohos.gni") config("hilog_rust_configs") { visibility = [ ":*" ] include_dirs = [ "includes", ] } ohos_rust_shared_library("hilog_rust") { sources = [ "src/lib.rs" ] deps = [ "../libhilog:libhilog" ] crate_name = "hilog_rust" crate_type = "dylib" rustflags = [ "-Zstack-protector=all" ] subsystem_name = "hiviewdfx" part_name = "hilog" }""") print("_generate_build_gn has done ") fp.close() def _hilog_rust_handle(part_data, module, components_json): public_deps_list = [] if not _is_innerkit(components_json, part_data.get("part_name"), module): return public_deps_list json_data = _get_json_data(part_data, module) _lib_special_handler(part_data.get("part_name"), module, part_data) lib_exists = _copy_lib(part_data, json_data, module) if lib_exists is False: return public_deps_list includes = _handle_includes_data(json_data) deps = _handle_deps_data(json_data) _copy_includes(part_data, module, includes) _list = _generate_build_gn(part_data, module, json_data, deps, components_json, public_deps_list) write_hilog_gn(part_data, module) _toolchain_gn_copy(part_data, module, json_data['out_name']) hilog_rust_out = os.path.join(part_data.get("out_path"), "component_package", part_data.get("part_path"), "innerapis", module) hilog_rust_dir = os.path.join(part_data.get("root_path"), part_data.get("part_path"), "interfaces", "rust") folder_to_copy = os.path.join(hilog_rust_dir, "src") # 替换为实际的文件夹名称 file_to_copy = os.path.join(hilog_rust_dir, "Cargo.toml") # 替换为实际的文件名称 # 检查文件夹和文件是否存在 if os.path.exists(folder_to_copy) and os.path.exists(file_to_copy): # 复制文件夹 shutil.copytree(folder_to_copy, os.path.join(hilog_rust_out, os.path.basename(folder_to_copy))) # 复制文件 shutil.copy(file_to_copy, os.path.join(hilog_rust_out, os.path.basename(file_to_copy))) else: print("文件夹或文件不存在,无法复制。") return _list def process_hilog(part_data, parts_path_info, part_name, subsystem_name, components_json): # 只处理一个模块 # 处理分类B的逻辑 part_path = _get_parts_path(parts_path_info, part_name) if part_path is None: return part_data.update({"subsystem_name": subsystem_name, "part_name": part_name, "part_path": part_path}) modules = _parse_module_list(part_data) print('modules', modules) if len(modules) == 0: return is_component_build = False _public_deps_list = [] for module in modules: module_deps_list = _handle_module(part_data, components_json, module) if module == 'hilog_rust': _hilog_rust_handle(part_data, module, components_json) if module_deps_list: _public_deps_list.extend(module_deps_list) is_component_build = True if is_component_build: _copy_required_docs(part_data, _public_deps_list) _finish_component_build(part_data) def generate_hisysevent_gn(part_data, module): gn_path = os.path.join(part_data.get("out_path"), "component_package", part_data.get("part_path"), "innerapis", module, "BUILD.gn") if os.path.exists(gn_path): os.remove(gn_path) fd = os.open(gn_path, os.O_WRONLY | os.O_CREAT, mode=0o640) fp = os.fdopen(fd, 'w') fp.write("""import("//build/ohos.gni") ohos_rust_shared_library("hisysevent_rust") { sources = [ "src/lib.rs", "src/macros.rs", "src/sys_event.rs", "src/sys_event_manager.rs", "src/utils.rs", ] external_deps = [ "hisysevent:hisysevent_c_wrapper", "hisysevent:libhisysevent", "hisysevent:libhisyseventmanager", ] crate_name = "hisysevent" crate_type = "dylib" rustflags = [ "-Zstack-protector=all" ] part_name = "hisysevent" subsystem_name = "hiviewdfx" } """) print("_generate_build_gn has done ") fp.close() def _hisysevent_rust_handle(part_data, module, components_json): public_deps_list = [] if not _is_innerkit(components_json, part_data.get("part_name"), module): return public_deps_list json_data = _get_json_data(part_data, module) _lib_special_handler(part_data.get("part_name"), module, part_data) lib_exists = _copy_lib(part_data, json_data, module) if lib_exists is False: return public_deps_list includes = _handle_includes_data(json_data) deps = _handle_deps_data(json_data) _copy_includes(part_data, module, includes) _list = _generate_build_gn(part_data, module, json_data, deps, components_json, public_deps_list) generate_hisysevent_gn(part_data, module) _toolchain_gn_copy(part_data, module, json_data['out_name']) hisysevent_rust_out = os.path.join(part_data.get("out_path"), "component_package", part_data.get("part_path"), "innerapis", module) hisysevent_rust_dir = os.path.join(part_data.get("root_path"), part_data.get("part_path"), "interfaces", "innerkits", "rust") folder_to_copy = os.path.join(hisysevent_rust_dir, "src") # 替换为实际的文件夹名称 file_to_copy = os.path.join(hisysevent_rust_dir, "Cargo.toml") # 替换为实际的文件名称 # 检查文件夹和文件是否存在 if os.path.exists(folder_to_copy) and os.path.exists(file_to_copy): # 复制文件夹 shutil.copytree(folder_to_copy, os.path.join(hisysevent_rust_out, os.path.basename(folder_to_copy))) # 复制文件 shutil.copy(file_to_copy, os.path.join(hisysevent_rust_out, os.path.basename(file_to_copy))) else: print("文件夹或文件不存在,无法复制。") return _list def process_hisysevent(part_data, parts_path_info, part_name, subsystem_name, components_json): # 只处理一个模块 part_path = _get_parts_path(parts_path_info, part_name) if part_path is None: return part_data.update({"subsystem_name": subsystem_name, "part_name": part_name, "part_path": part_path}) modules = _parse_module_list(part_data) print('modules', modules) if len(modules) == 0: return is_component_build = False _public_deps_list = [] for module in modules: module_deps_list = _handle_module(part_data, components_json, module) if module == 'hisysevent_rust': _hisysevent_rust_handle(part_data, module, components_json) if module_deps_list: _public_deps_list.extend(module_deps_list) is_component_build = True if is_component_build: _copy_required_docs(part_data, _public_deps_list) _finish_component_build(part_data) def _generate_runtime_core_build_gn(): gn_path = os.path.join(args.get("out_path"), "component_package", args.get("part_path"), "innerapis", module, "BUILD.gn") fd = os.open(gn_path, os.O_WRONLY | os.O_CREAT, mode=0o640) fp = os.fdopen(fd, 'w') _generate_import(fp) _generate_configs(fp, module) _generate_prebuilt_shared_library(fp, json_data.get('type'), module) _generate_public_configs(fp, module) _list = _generate_public_deps(fp, module, deps, components_json, public_deps_list) _generate_other(fp, args, json_data, module) _generate_end(fp) print("_generate_build_gn has done ") fp.close() return _list def _handle_module_runtime_core(args, components_json, module): public_deps_list = [] if _is_innerkit(components_json, args.get("part_name"), module) == False: return public_deps_list json_data = _get_json_data(args, module) _lib_special_handler(args.get("part_name"), module, args) lib_exists = _copy_lib(args, json_data, module) if lib_exists is False: return public_deps_list includes = _handle_includes_data(json_data) deps = _handle_deps_data(json_data) _copy_includes(args, module, includes) _list = _generate_build_gn(args, module, json_data, deps, components_json, public_deps_list) _toolchain_gn_copy(args, module, json_data['out_name']) return _list def process_runtime_core(part_data, parts_path_info, part_name, subsystem_name, components_json): # 处理分类runtime_core的逻辑 part_path = _get_parts_path(parts_path_info, part_name) if part_path is None: return part_data.update({"subsystem_name": subsystem_name, "part_name": part_name, "part_path": part_path}) modules = _parse_module_list(part_data) print('modules', modules) if len(modules) == 0: return is_component_build = False _public_deps_list = [] for module in modules: module_deps_list = _handle_module_runtime_core(part_data, components_json, module) if module_deps_list: _public_deps_list.extend(module_deps_list) is_component_build = True if is_component_build: _copy_required_docs(part_data, _public_deps_list) _finish_component_build(part_data) def process_drivers_interface_display(part_data, parts_path_info, part_name, subsystem_name, components_json): part_path = _get_parts_path(parts_path_info, part_name) if part_path is None: return part_data.update({"subsystem_name": subsystem_name, "part_name": part_name, "part_path": part_path}) modules = _parse_module_list(part_data) print('modules', modules) if len(modules) == 0: return is_component_build = False _public_deps_list = [] for module in modules: module_deps_list = _handle_module(part_data, components_json, module) if module_deps_list: _public_deps_list.extend(module_deps_list) is_component_build = True lib_out_dir = os.path.join(part_data.get("out_path"), "component_package", part_data.get("part_path"), "innerapis", "display_commontype_idl_headers", "libs") if not os.path.exists(lib_out_dir): os.makedirs(lib_out_dir) file_path = os.path.join(lib_out_dir, 'libdisplay_commontype_idl_headers') with open(file_path, 'wb') as file: pass if is_component_build: _copy_required_docs(part_data, _public_deps_list) _finish_component_build(part_data) def process_drivers_interface_usb(part_data, parts_path_info, part_name, subsystem_name, components_json): part_path = _get_parts_path(parts_path_info, part_name) if part_path is None: return part_data.update({"subsystem_name": subsystem_name, "part_name": part_name, "part_path": part_path}) modules = _parse_module_list(part_data) print('modules', modules) if len(modules) == 0: return is_component_build = False _public_deps_list = [] for module in modules: module_deps_list = _handle_module(part_data, components_json, module) if module_deps_list: _public_deps_list.extend(module_deps_list) is_component_build = True lib_out_dir = os.path.join(part_data.get("out_path"), "component_package", part_data.get("part_path"), "innerapis", "usb_idl_headers_1.1", "libs") if not os.path.exists(lib_out_dir): os.makedirs(lib_out_dir) file_path = os.path.join(lib_out_dir, 'libusb_idl_headers_1.1') with open(file_path, 'wb') as file: pass if is_component_build: _copy_required_docs(part_data, _public_deps_list) _finish_component_build(part_data) def process_drivers_interface_ril(part_data, parts_path_info, part_name, subsystem_name, components_json): part_path = _get_parts_path(parts_path_info, part_name) if part_path is None: return part_data.update({"subsystem_name": subsystem_name, "part_name": part_name, "part_path": part_path}) modules = _parse_module_list(part_data) print('modules', modules) if len(modules) == 0: return is_component_build = False _public_deps_list = [] for module in modules: module_deps_list = _handle_module(part_data, components_json, module) if module_deps_list: _public_deps_list.extend(module_deps_list) is_component_build = True lib_out_dir = os.path.join(part_data.get("out_path"), "component_package", part_data.get("part_path"), "innerapis", "ril_idl_headers", "libs") if not os.path.exists(lib_out_dir): os.makedirs(lib_out_dir) file_path = os.path.join(lib_out_dir, 'libril_idl_headers') with open(file_path, 'wb') as file: pass if is_component_build: _copy_required_docs(part_data, _public_deps_list) _finish_component_build(part_data) # 函数映射字典 function_map = { 'musl': process_musl, "developer_test": process_developer_test, # 同rust "drivers_interface_display": process_drivers_interface_display, # 驱动的, 新建一个libs目录/ innerapi同名文件 "runtime_core": process_runtime_core, # 编译参数, 所有下面的innerapi的cflags都不 "drivers_interface_usb": process_drivers_interface_usb, # 同驱动 "drivers_interface_ril": process_drivers_interface_ril, # 同驱动 } def _process_part(args, parts_path_info, part_name, subsystem_name, components_json): # 使用映射字典来调用对应的函数 if part_name in function_map.keys(): function_map[part_name](args, parts_path_info, part_name, subsystem_name, components_json) else: print(f"没有找到处理分类{part_name}的函数。") def _check_label(public_deps, value): innerapis = value["innerapis"] for _innerapi in innerapis: if _innerapi: label = _innerapi.get("label") if public_deps == label: return label.split(':')[-1] continue return "" def _get_public_external_deps(data, public_deps): if not isinstance(data, dict): return "" for key, value in data.items(): if not isinstance(value, dict): continue _data = _check_label(public_deps, value) if _data: return f"{key}:{_data}" continue return "" def _is_innerkit(data, part, module): if not isinstance(data, dict): return False part_data = data.get(part) if not isinstance(part_data, dict): return False module_list = [] for i in part_data["innerapis"]: if i: module_list.append(i["name"]) if module in module_list: return True return False def _get_components_json(out_path): jsondata = "" json_path = os.path.join(out_path + "/build_configs/parts_info/components.json") with os.fdopen(os.open(json_path, os.O_RDWR | os.O_CREAT, stat.S_IWUSR | stat.S_IRUSR), 'r', encoding='utf-8') as f: try: jsondata = json.load(f) except Exception as e: print('--_get_components_json parse json error--') return jsondata def _handle_one_layer_json(json_key, json_data, desc_list): data_list = json_data.get(json_key) if isinstance(data_list, list) and len(json_data.get(json_key)) >= 1: desc_list.extend(data_list) else: desc_list.append(json_data.get(json_key)) def _get_external_public_config(_path, _config_name): py_args = _get_args() out_path = py_args.out_path _json_path = os.path.join(out_path, 'external_public_configs', _path, f'{_config_name}.json') try: with os.fdopen(os.open(_json_path, os.O_RDWR | os.O_CREAT, stat.S_IWUSR | stat.S_IRUSR), 'r', encoding='utf-8') as f: jsondata = json.load(f) except Exception as e: print('_json_path: ', _json_path) print('--_get_external_public_config parse json error--') return [] include_dirs = jsondata.get('include_dirs') return include_dirs def _handle_two_layer_json(json_key, json_data, desc_list): value_depth = len(json_data.get(json_key)) for i in range(value_depth): _label = json_data.get(json_key)[i].get('label') _include_dirs = json_data.get(json_key)[i].get('include_dirs') if _include_dirs: desc_list.extend(_include_dirs) else: full_path = _label.split('//')[-1] _path = full_path.split(':')[0] _config_name = full_path.split(':')[-1] _include_dirs = _get_external_public_config(_path, _config_name) if _include_dirs: desc_list.extend(_include_dirs) def _get_json_data(args, module): json_path = os.path.join(args.get("out_path"), args.get("subsystem_name"), args.get("part_name"), "publicinfo", module + ".json") with os.fdopen(os.open(json_path, os.O_RDWR | os.O_CREAT, stat.S_IWUSR | stat.S_IRUSR), 'r', encoding='utf-8') as f: try: file_content = f.read() jsondata = json.loads(file_content) except Exception as e: print(json_path) print('--_get_json_data parse json error--', e) return jsondata def _handle_deps_data(json_data): dep_list = [] if json_data.get('public_deps'): _handle_one_layer_json('public_deps', json_data, dep_list) return dep_list def _handle_includes_data(json_data): include_list = [] if json_data.get('public_configs'): _handle_two_layer_json('public_configs', json_data, include_list) if json_data.get('all_dependent_configs'): _handle_two_layer_json('all_dependent_configs', json_data, include_list) return include_list def _get_static_lib_path(args, json_data): label = json_data.get('label') split_label = label.split("//")[1].split(":")[0] real_static_lib_path = os.path.join(args.get("out_path"), "obj", split_label, json_data.get('out_name')) return real_static_lib_path def _copy_dir(src_path, target_path): if not os.path.isdir(src_path): return False for file in os.listdir(src_path): path = os.path.join(src_path, file) if os.path.isdir(path): if file.startswith("."): continue path1 = os.path.join(target_path, file) _copy_dir(path, path1) else: _, file_extension = os.path.splitext(file) if file_extension not in [".h", ".hpp", ".in", ".inc", ".inl"]: continue if not os.path.exists(target_path): os.makedirs(target_path) shutil.copy2(path, os.path.join(target_path, file)) return True def _copy_includes(args, module, includes: list): if module == 'ipc_single': includes = [ "//foundation/communication/ipc/interfaces/innerkits/ipc_core/include", "//foundation/communication/ipc/ipc/native/src/core/include", "//foundation/communication/ipc/ipc/native/src/mock/include", ] includes_out_dir = os.path.join(args.get("out_path"), "component_package", args.get("part_path"), "innerapis", module, "includes") for i in args.get("toolchain_info").keys(): toolchain_includes_out_dir = os.path.join(args.get("out_path"), "component_package", args.get("part_path"), "innerapis", module, i, "includes") toolchain_lib_out_dir = os.path.join(args.get("out_path"), "component_package", args.get("part_path"), "innerapis", module, i, "libs") if not os.path.exists(toolchain_includes_out_dir) and os.path.exists(toolchain_lib_out_dir): os.makedirs(toolchain_includes_out_dir) else: continue for include in includes: part_path = args.get("part_path") _sub_include = include.split(f"{part_path}/")[-1] split_include = include.split("//")[1] real_include_path = os.path.join(args.get("root_path"), split_include) if args.get('part_name') == 'libunwind': _out_dir = os.path.join(toolchain_includes_out_dir, _sub_include) _copy_dir(real_include_path, _out_dir) continue _copy_dir(real_include_path, toolchain_includes_out_dir) if not os.path.exists(includes_out_dir): os.makedirs(includes_out_dir) for include in includes: part_path = args.get("part_path") _sub_include = include.split(f"{part_path}/")[-1] split_include = include.split("//")[1] real_include_path = os.path.join(args.get("root_path"), split_include) if args.get('part_name') == 'libunwind': _out_dir = os.path.join(includes_out_dir, _sub_include) _copy_dir(real_include_path, _out_dir) continue _copy_dir(real_include_path, includes_out_dir) print("_copy_includes has done ") def _copy_toolchain_lib(file_name, root, _name, lib_out_dir): if not file_name.startswith('.') and file_name.startswith(_name): if not os.path.exists(lib_out_dir): os.makedirs(lib_out_dir) file = os.path.join(root, file_name) shutil.copy(file, lib_out_dir) def _toolchain_lib_handler(args, toolchain_path, _name, module, toolchain_name): lib_out_dir = os.path.join(args.get("out_path"), "component_package", args.get("part_path"), "innerapis", module, toolchain_name, "libs") if os.path.isfile(toolchain_path): if not os.path.exists(lib_out_dir): os.makedirs(lib_out_dir) shutil.copy(toolchain_path, lib_out_dir) else: for root, dirs, files in os.walk(toolchain_path): for file_name in files: _copy_toolchain_lib(file_name, root, _name, lib_out_dir) def _toolchain_static_file_path_mapping(subsystem_name, args, i): if subsystem_name == "thirdparty": subsystem_name = "third_party" toolchain_path = os.path.join(args.get("out_path"), i, 'obj', subsystem_name, args.get("part_name")) return toolchain_path def replace_default_toolchains_in_output(path, default_toolchain="ohos_clang_arm64/"): return path.replace(default_toolchain, "") def _copy_lib(args, json_data, module): so_path = "" lib_status = False _target_type = json_data.get('type') subsystem_name = args.get("subsystem_name") # 根据 type 字段和 module 选择正确的 so_path if _target_type == 'copy' and module == 'ipc_core': so_path = os.path.join(subsystem_name, args.get("part_name"), 'libipc_single.z.so') elif _target_type == "rust_library" or _target_type == "rust_proc_macro": # 选择包含 'lib.unstripped' 的路径 outputs = json_data.get('outputs', []) for output in outputs: if 'lib.unstripped' in output: output = replace_default_toolchains_in_output(output) so_path = output break # 如果没有找到包含 'lib.unstripped' 的路径,则选择最后一个路径 if not so_path and outputs: so_path = outputs[-1] else: # 对于非 rust_library 类型,选择不包含 'lib.unstripped' 的路径 outputs = json_data.get('outputs', []) for output in outputs: if '.unstripped' not in output: output = replace_default_toolchains_in_output(output) so_path = output break # 如果所有路径都包含 'lib.unstripped' 或者没有 outputs,则使用 out_name if not so_path: so_path = json_data.get('out_name') if so_path: lib_status = lib_status or copy_so_file(args, module, so_path) return lib_status def copy_so_file(args, module, so_path): lib_status = False out_path = args.get("out_path") so_path_with_out_path = os.path.join(out_path, so_path) lib_out_dir = os.path.join(out_path, "component_package", args.get("part_path"), "innerapis", module, "libs") if args.get("toolchain_info").keys(): for toolchain_name in args.get("toolchain_info").keys(): lib_out_dir_with_toolchain = os.path.join(args.get("out_path"), "component_package", args.get("part_path"), "innerapis", module, toolchain_name, "libs") so_path_with_toolchain = os.path.join(args.get("out_path"), toolchain_name, so_path) if toolchain_name in so_path : lib_status = _copy_file(so_path_with_out_path, lib_out_dir_with_toolchain) or lib_status elif os.path.isfile(so_path_with_toolchain): lib_status = _copy_file(so_path_with_toolchain, lib_out_dir_with_toolchain) or lib_status lib_status = _copy_file(so_path_with_out_path, lib_out_dir) or lib_status return lib_status def _copy_file(so_path, lib_out_dir): if not os.path.isfile(so_path): print("WARNING: {} is not a file!".format(so_path)) return False if not os.path.exists(lib_out_dir): os.makedirs(lib_out_dir) shutil.copy(so_path, lib_out_dir) return True def _dirs_handler(bundlejson_out): dirs = dict() dirs['./'] = [] directory = bundlejson_out for filename in os.listdir(directory): filepath = os.path.join(directory, filename) if os.path.isfile(filepath): dirs['./'].append(filename) else: dirs[filename] = [f"{filename}/*"] delete_list = ['LICENSE', 'README.md', 'README_zh.md', 'README_en.md', 'bundle.json'] for delete_txt in delete_list: if delete_txt in dirs['./']: dirs['./'].remove(delete_txt) if dirs['./'] == []: del dirs['./'] return dirs def _copy_bundlejson(args, public_deps_list): bundlejson_out = os.path.join(args.get("out_path"), "component_package", args.get("part_path")) print("bundlejson_out : ", bundlejson_out) if not os.path.exists(bundlejson_out): os.makedirs(bundlejson_out) bundlejson = os.path.join(args.get("root_path"), args.get("part_path"), "bundle.json") dependencies_dict = {} sorted_dict = {} for public_deps in public_deps_list: _public_dep_part_name = public_deps.split(':')[0] if _public_dep_part_name != args.get("part_name"): _public_dep = f"@{args.get('organization_name')}/{_public_dep_part_name}" dependencies_dict.update({_public_dep: "*"}) sorted_dict = dict(sorted(dependencies_dict.items())) if os.path.isfile(bundlejson): with open(bundlejson, 'r') as f: bundle_data = json.load(f) bundle_data['publishAs'] = 'binary' bundle_data.update({'os': args.get('os')}) bundle_data.update({'buildArch': args.get('buildArch')}) dirs = _dirs_handler(bundlejson_out) bundle_data['dirs'] = dirs bundle_data['version'] = str(bundle_data['version']) if bundle_data['version'] == '': bundle_data['version'] = '1.0.0' pattern = r'^(\d+)\.(\d+)(-[a-zA-Z]+)?$' # 正则表达式匹配a.b[-后缀]格式的字符串 match = re.match(pattern, bundle_data['version']) if match: a = match.group(1) b = match.group(2) suffix = match.group(3) if match.group(3) else "" bundle_data['version'] = f"{a}.{b}.0{suffix}" if args.get('build_type') in [0, 1]: bundle_data['version'] += '-snapshot' if args.get('organization_name'): _name_pattern = r'@(.*.)/' bundle_data['name'] = re.sub(_name_pattern, '@' + args.get('organization_name') + '/', bundle_data['name']) if bundle_data.get('scripts'): bundle_data.update({'scripts': {}}) if bundle_data.get('licensePath'): del bundle_data['licensePath'] if bundle_data.get('readmePath'): del bundle_data['readmePath'] bundle_data['dependencies'] = sorted_dict if os.path.isfile(os.path.join(bundlejson_out, "bundle.json")): os.remove(os.path.join(bundlejson_out, "bundle.json")) with os.fdopen(os.open(os.path.join(bundlejson_out, "bundle.json"), os.O_WRONLY | os.O_CREAT, mode=0o640), "w", encoding='utf-8') as fd: json.dump(bundle_data, fd, indent=4, ensure_ascii=False) def _copy_license(args): license_out = os.path.join(args.get("out_path"), "component_package", args.get("part_path")) print("license_out : ", license_out) if not os.path.exists(license_out): os.makedirs(license_out) license_file = os.path.join(args.get("root_path"), args.get("part_path"), "LICENSE") if os.path.isfile(license_file): shutil.copy(license_file, license_out) else: license_default = os.path.join(args.get("root_path"), "build", "LICENSE") shutil.copy(license_default, license_out) bundlejson_out = os.path.join(args.get("out_path"), "component_package", args.get("part_path"), 'bundle.json') with open(bundlejson_out, 'r') as f: bundle_data = json.load(f) bundle_data.update({"license": "Apache License 2.0"}) if os.path.isfile(bundlejson_out): os.remove(bundlejson_out) with os.fdopen(os.open(bundlejson_out, os.O_WRONLY | os.O_CREAT, mode=0o640), "w", encoding='utf-8') as fd: json.dump(bundle_data, fd, indent=4, ensure_ascii=False) def _copy_readme(args): readme_out = os.path.join(args.get("out_path"), "component_package", args.get("part_path")) print("readme_out : ", readme_out) if not os.path.exists(readme_out): os.makedirs(readme_out) readme = os.path.join(args.get("root_path"), args.get("part_path"), "README.md") readme_zh = os.path.join(args.get("root_path"), args.get("part_path"), "README_zh.md") readme_en = os.path.join(args.get("root_path"), args.get("part_path"), "README_en.md") readme_out_file = os.path.join(readme_out, "README.md") if os.path.isfile(readme): shutil.copy(readme, readme_out) elif os.path.isfile(readme_zh): shutil.copy(readme_zh, readme_out_file) elif os.path.isfile(readme_en): shutil.copy(readme_en, readme_out_file) else: try: with os.fdopen(os.open(readme_out_file, os.O_WRONLY | os.O_CREAT, mode=0o640), 'w') as fp: fp.write('READ.ME') except FileExistsError: pass def _generate_import(fp): fp.write('import("//build/ohos.gni")\n') def _gcc_flags_info_handle(json_data): def should_process_key(k): return k not in ["label", "include_dirs"] def process_config(config): result = {} for k, v in config.items(): if should_process_key(k): result.setdefault(k, []).extend(v) return result _flags_info = {} _public_configs = json_data.get('public_configs') if _public_configs: for config in _public_configs: config_info = process_config(config) for k, v in config_info.items(): _flags_info.setdefault(k, []).extend(v) return _flags_info def _generate_configs(fp, module, json_data, _part_name): fp.write('\nconfig("' + module + '_configs") {\n') fp.write(' visibility = [ ":*" ]\n') fp.write(' include_dirs = [\n') fp.write(' "includes",\n') if module == 'libunwind': fp.write(' "includes/libunwind-1.6.2",\n') fp.write(' "includes/libunwind-1.6.2/src",\n') fp.write(' "includes/libunwind-1.6.2/include",\n') fp.write(' "includes/libunwind-1.6.2/include/tdep-arm",\n') if module == 'ability_runtime': fp.write(' "includes/context",\n') fp.write(' "includes/app",\n') fp.write(' ]\n') if _part_name == 'runtime_core': fp.write(' }\n') return _flags_info = _gcc_flags_info_handle(json_data) if _flags_info: for k, _list in _flags_info.items(): fp.write(f' {k} = [\n') for j in _list: # 保留所有 \ 转义符号 j_escaped = j.replace('"', '\\"') fp.write(f' "{j_escaped}",\n') fp.write(' ]\n') fp.write(' }\n') def _generate_prebuilt_target(fp, target_type, module): if target_type == 'static_library': fp.write('ohos_prebuilt_static_library("' + module + '") {\n') elif target_type == 'executable': fp.write('ohos_prebuilt_executable("' + module + '") {\n') elif module != 'ipc_core' and (target_type == 'etc' or target_type == 'copy'): fp.write('ohos_prebuilt_etc("' + module + '") {\n') elif target_type == 'rust_library' or target_type == 'rust_proc_macro': fp.write('ohos_prebuilt_rust_library("' + module + '") {\n') else: fp.write('ohos_prebuilt_shared_library("' + module + '") {\n') def _generate_public_configs(fp, module): fp.write(f' public_configs = [":{module}_configs"]\n') # 目前特殊处理的依赖关系映射 _DEPENDENCIES_MAP = { ('bundle_framework', 'appexecfwk_core'): ["ability_base:want", "samgr:samgr_proxy"], ('graphic_surface', 'surface'): ["ipc:ipc_core"], ('netmanager_base', 'net_conn_manager_if'): ["ipc:ipc_core"], ('ability_base', 'want'): ["ipc:ipc_core", "json:nlohmann_json_static"], ('ability_base', 'session_info'): ["bundle_framework:appexecfwk_base"], ('window_manager', 'libdm'): ["graphic_2d:librender_service_base"], ('enterprise_device_management', 'edmservice_kits'): ["ability_base:want"], ('ability_runtime', 'appkit_native'): ["ets_runtime:libark_jsruntime"], ('samgr', 'samgr_proxy'): ["ipc:ipc_core"], ('napi', 'ace_napi'): ["ets_runtime:libark_jsruntime", "runtime_core:libarkfile_static", "runtime_core:libarkbase_static"], ('ability_runtime', 'ability_context_native'): ["eventhandler:libeventhandler"], ('ability_runtime', 'abilitykit_native'): ["ipc:ipc_napi", "ability_runtime:ability_manager", "eventhandler:libeventhandler"], ('ipc', 'ipc_core'): ["c_utils:utils"], ('ipc', 'ipc_single'): ["c_utils:utils"], ('graphic_2d', 'libcomposer'): ["eventhandler:libeventhandler"], ('ability_runtime', 'napi_common'): ["ability_runtime:runtime"], ('graphic_2d', 'librender_service_client'): ["graphic_2d:2d_graphics", "graphic_2d:libcomposer", "image_framework:pixelconvertadapter", "eventhandler:libeventhandler"], ('graphic_2d', 'librender_service_base'): ["opengles:libGLES"], ('graphic_2d', '2d_graphics'): ["skia:skia_canvaskit", "opengles:libGLES"], ('input', 'libmmi-client'): ["eventhandler:libeventhandler"], ('resource_schedule_service', 'ressched_client'): ["samgr:samgr_proxy"], ('relational_store', 'native_rdb'): ["c_utils:utils"], ('opengles', 'libGLES'): ["egl:libEGL"], ('ability_runtime', 'ability_manager'): ["bundle_framework:libappexecfwk_common"], ('ability_runtime', 'napi_common'): ["ability_runtime:runtime"], ('access_token', 'libnativetoken'): ["cJSON:cjson_static", "selinux_adapter:librestorecon"], ('bundle_framework', 'bundlemgr_mini'): ["bundle_framework:appexecfwk_base"], ('media_foundation', 'media_monitor_client'): ["samgr:samgr_proxy"], } def _public_deps_special_handler(module, args): _part_name = args.get('part_name') # 使用映射字典来获取依赖列表 return _DEPENDENCIES_MAP.get((_part_name, module), []) def _generate_public_deps(fp, module, deps: list, components_json, public_deps_list: list, args): fp.write(' public_external_deps = [\n') for dep in deps: public_external_deps = _get_public_external_deps(components_json, dep) if len(public_external_deps) > 0: fp.write(f""" "{public_external_deps}",\n""") public_deps_list.append(public_external_deps) for _public_external_deps in _public_deps_special_handler(module, args): fp.write(f""" "{_public_external_deps}",\n""") public_deps_list.append(_public_external_deps) fp.write(' ]\n') return public_deps_list def _generate_other(fp, args, json_data, module): outputs = json_data.get('outputs', []) if json_data.get('type') == 'copy' and module == 'ipc_core': so_name = 'libipc_single.z.so' else: so_name = json_data.get('out_name') for output in outputs: so_name = output.split('/')[-1] if json_data.get('type') == 'copy': fp.write(' copy_linkable_file = true \n') fp.write(' source = "libs/' + so_name + '"\n') fp.write(' part_name = "' + args.get("part_name") + '"\n') fp.write(' subsystem_name = "' + args.get("subsystem_name") + '"\n') def _generate_end(fp): fp.write('}') def convert_rustdeps_to_innerapi(dep, components_json): # 分割路径和模块名 dep_parts = dep.split(':', 1) if len(dep_parts) != 2: return False, "" # 格式不正确,不是 innerapi path, module = dep_parts path = path.lstrip('//') # 遍历 components.json 中的每个部件 for component, info in components_json.items(): if path.startswith(info['path']) and _check_dep_in_innerapi(info.get('innerapis', []), dep): return True, f"{component}:{module}" return False, "" def _check_dep_in_innerapi(innerapis, dep): for innerapi in innerapis: if innerapi['label'] == (dep.split('(')[0] if ('(' in dep) else dep): return True return False def _generate_rust_deps(fp, json_data, components_json): rust_deps = json_data.get("rust_deps") external_deps = [] for _dep in rust_deps: has_innerapi, innerapi = convert_rustdeps_to_innerapi(_dep, components_json) if has_innerapi: external_deps.append(innerapi) fp.write(' external_deps = [\n') for external_dep in external_deps: fp.write(f""" "{external_dep}",\n""") fp.write(' ]\n') def _copy_rust_crate_info(fp, json_data): fp.write(f' rust_crate_name = \"{json_data.get("rust_crate_name")}\"\n') fp.write(f' rust_crate_type = \"{json_data.get("rust_crate_type")}\"\n') def _generate_build_gn(args, module, json_data, deps: list, components_json, public_deps_list): gn_path = os.path.join(args.get("out_path"), "component_package", args.get("part_path"), "innerapis", module, "BUILD.gn") fd = os.open(gn_path, os.O_WRONLY | os.O_CREAT, mode=0o640) fp = os.fdopen(fd, 'w') _generate_import(fp) _generate_configs(fp, module, json_data, args.get('part_name')) _target_type = json_data.get('type') _generate_prebuilt_target(fp, _target_type, module) _generate_public_configs(fp, module) _list = _generate_public_deps(fp, module, deps, components_json, public_deps_list, args) if _target_type == "rust_library" or _target_type == "rust_proc_macro": _copy_rust_crate_info(fp, json_data) _generate_rust_deps(fp, json_data, components_json) _generate_other(fp, args, json_data, module) _generate_end(fp) print(f"{module}_generate_build_gn has done ") fp.close() return _list def _toolchain_gn_modify(gn_path, file_name, toolchain_gn_file): if os.path.isfile(gn_path) and file_name: with open(gn_path, 'r') as f: _gn = f.read() pattern = r"libs/(.*.)" toolchain_gn = re.sub(pattern, 'libs/' + file_name + '\"', _gn) fd = os.open(toolchain_gn_file, os.O_WRONLY | os.O_CREAT, mode=0o640) fp = os.fdopen(fd, 'w') fp.write(toolchain_gn) fp.close() def _get_toolchain_gn_file(lib_out_dir, out_name): if os.path.exists(os.path.join(lib_out_dir, out_name)): return out_name else: print('Output file not found in toolchain dir.') return '' def _toolchain_gn_copy(args, module, out_name): gn_path = os.path.join(args.get("out_path"), "component_package", args.get("part_path"), "innerapis", module, "BUILD.gn") for i in args.get("toolchain_info").keys(): lib_out_dir = os.path.join(args.get("out_path"), "component_package", args.get("part_path"), "innerapis", module, i, "libs") file_name = _get_toolchain_gn_file(lib_out_dir, out_name) if not file_name: continue toolchain_gn_file = os.path.join(args.get("out_path"), "component_package", args.get("part_path"), "innerapis", module, i, "BUILD.gn") if not os.path.exists(toolchain_gn_file): os.mknod(toolchain_gn_file) _toolchain_gn_modify(gn_path, file_name, toolchain_gn_file) def _parse_module_list(args): module_list = [] publicinfo_path = os.path.join(args.get("out_path"), args.get("subsystem_name"), args.get("part_name"), "publicinfo") print('publicinfo_path', publicinfo_path) if os.path.exists(publicinfo_path) is False: return module_list publicinfo_dir = os.listdir(publicinfo_path) for filename in publicinfo_dir: if filename.endswith(".json"): module_name = filename.split(".json")[0] module_list.append(module_name) print('filename', filename) print('module_list', module_list) return module_list def _lib_special_handler(part_name, module, args): if part_name == 'mksh': mksh_file_path = os.path.join(args.get('out_path'), 'startup', 'init', 'sh') sh_out = os.path.join(args.get("out_path"), "thirdparty", "mksh") if os.path.isfile(mksh_file_path): shutil.copy(mksh_file_path, sh_out) if module == 'blkid': blkid_file_path = os.path.join(args.get('out_path'), 'filemanagement', 'storage_service', 'blkid') blkid_out = os.path.join(args.get("out_path"), "thirdparty", "e2fsprogs") if os.path.isfile(blkid_file_path): shutil.copy(blkid_file_path, blkid_out) if module == 'grpc_cpp_plugin': blkid_file_path = os.path.join(args.get('out_path'), 'clang_x64', 'thirdparty', 'grpc', 'grpc_cpp_plugin') blkid_out = os.path.join(args.get("out_path"), "thirdparty", "grpc") if os.path.isfile(blkid_file_path): shutil.copy(blkid_file_path, blkid_out) def _handle_module(args, components_json, module): public_deps_list = [] if _is_innerkit(components_json, args.get("part_name"), module) == False: return public_deps_list json_data = _get_json_data(args, module) _lib_special_handler(args.get("part_name"), module, args) _copy_lib(args, json_data, module) includes = _handle_includes_data(json_data) deps = _handle_deps_data(json_data) _copy_includes(args, module, includes) _list = _generate_build_gn(args, module, json_data, deps, components_json, public_deps_list) _toolchain_gn_copy(args, module, json_data['out_name']) return _list def _copy_required_docs(args, public_deps_list): _copy_bundlejson(args, public_deps_list) _copy_license(args) _copy_readme(args) def _finish_component_build(args): if args.get("build_type") in [0, 1]: _hpm_status = _hpm_pack(args) if _hpm_status: _copy_hpm_pack(args) def _generate_component_package(args, components_json): modules = _parse_module_list(args) print('modules', modules) if len(modules) == 0: return is_component_build = False _public_deps_list = [] for module in modules: module_deps_list = _handle_module(args, components_json, module) if module_deps_list: _public_deps_list.extend(module_deps_list) is_component_build = True if is_component_build: _copy_required_docs(args, _public_deps_list) _finish_component_build(args) def _get_part_subsystem(components_json: dict): jsondata = dict() try: for component, v in components_json.items(): jsondata[component] = v.get('subsystem') except Exception as e: print('--_get_part_subsystem parse json error--') return jsondata def _get_parts_path_info(components_json): jsondata = dict() try: for component, v in components_json.items(): jsondata[component] = v.get('path') except Exception as e: print('--_get_part_subsystem parse json error--') return jsondata def _get_toolchain_info(root_path): jsondata = "" json_path = os.path.join(root_path + "/build/indep_configs/variants/common/toolchain.json") with os.fdopen(os.open(json_path, os.O_RDWR | os.O_CREAT, stat.S_IWUSR | stat.S_IRUSR), 'r', encoding='utf-8') as f: try: jsondata = json.load(f) except Exception as e: print('--_get_toolchain_info parse json error--') return jsondata def _get_parts_path(json_data, part_name): parts_path = None if json_data.get(part_name) is not None: parts_path = json_data[part_name] return parts_path def _hpm_pack(args): part_path = os.path.join(args.get("out_path"), "component_package", args.get("part_path")) cmd = ['hpm', 'pack'] try: subprocess.run(cmd, shell=False, cwd=part_path) except Exception as e: print("{} pack fail".format(args.get("part_name"))) return 0 print("{} pack succ".format(args.get("part_name"))) return 1 def _copy_hpm_pack(args): hpm_packages_path = args.get('hpm_packages_path') part_path = os.path.join(args.get("out_path"), "component_package", args.get("part_path")) dirs = os.listdir(part_path) tgz_file_name = '' for file in dirs: if file.endswith(".tgz"): tgz_file_name = file tgz_file_out = os.path.join(part_path, tgz_file_name) if tgz_file_name: shutil.copy(tgz_file_out, hpm_packages_path) def _make_hpm_packages_dir(root_path): _out_path = os.path.join(root_path, 'out') hpm_packages_path = os.path.join(_out_path, 'hpm_packages') os.makedirs(hpm_packages_path, exist_ok=True) return hpm_packages_path def _del_exist_component_package(out_path): _component_package_path = os.path.join(out_path, 'component_package') if os.path.isdir(_component_package_path): try: print('del dir component_package start..') shutil.rmtree(_component_package_path) print('del dir component_package end..') except Exception as e: print('del dir component_package FAILED') def _get_component_check(local_test) -> list: check_list = [] if local_test == 0: contents = urllib.request.urlopen( "https://ci.openharmony.cn/api/daily_build/component/check/list").read().decode( encoding="utf-8") _check_json = json.loads(contents) try: check_list.extend(_check_json["data"]["dep_list"]) check_list.extend(_check_json["data"]["indep_list"]) except Exception as e: print("Call the component check API something wrong, plz check the API return..") check_list = list(set(check_list)) check_list = sorted(check_list) return check_list def _package_interface(args, parts_path_info, part_name, subsystem_name, components_json): part_path = _get_parts_path(parts_path_info, part_name) if part_path is None: return args.update({"subsystem_name": subsystem_name, "part_name": part_name, "part_path": part_path}) if part_name in [ "musl", # 从obj/third_party/musl/usr 下提取到includes和libs "developer_test", # 同rust "drivers_interface_display", # 驱动的, 新建一个libs目录/ innerapi同名文件 "runtime_core", # 编译参数, 所有下面的innerapi的cflags都不 "drivers_interface_usb", # 同驱动 "drivers_interface_ril", # 同驱动 ]: _process_part(args, parts_path_info, part_name, subsystem_name, components_json) else: _generate_component_package(args, components_json) def _get_exclusion_list(root_path): part_black_list_path = os.path.join(root_path, "build", "indep_configs", "config", "binary_package_exclusion_list.json") data = [] try: with open(part_black_list_path, 'r') as f: data = json.load(f) except FileNotFoundError: print(f"can not find file: {part_black_list_path}.") except Exception as e: print(f"{part_black_list_path}: \n {e}") return data def generate_component_package(out_path, root_path, components_list=None, build_type=0, organization_name='ohos', os_arg='linux', build_arch_arg='x86', local_test=0): """ Args: out_path: output path of code default : out/rk3568 root_path: root path of code default : oh/ components_list: list of all components that need to be built build_type: build type 0: default pack,do not change organization_name 1: pack ,change organization_name 2: do not pack,do not change organization_name organization_name: default ohos, if diff then change os_arg: default : linux build_arch_arg: default : x86 local_test: 1 to open local test , default 0 to close Returns: """ start_time = time.time() components_json = _get_components_json(out_path) components_json.update({"rust": { "innerapis": [], "path": "third_party/rust", "subsystem": "thirdparty", "variants": [] }, "developer_test": { "innerapis": [], "path": "test/testfwk/developer_test", "subsystem": "testfwk", "variants": [] } }) part_subsystem = _get_part_subsystem(components_json) parts_path_info = _get_parts_path_info(components_json) hpm_packages_path = _make_hpm_packages_dir(root_path) toolchain_info = _get_toolchain_info(root_path) ### add exclusion_list = _get_exclusion_list(root_path) # 黑名单列表 # 如果没有提供 components_list,则默认为所有非黑名单组件 all_components = components_json.keys() if local_test == 1: components_list = components_list.split(",") if components_list else all_components elif local_test == 0: if components_list: components_list = [component for component in components_list.split(",") if component not in exclusion_list] else: components_list = [component for component in all_components if component not in exclusion_list] # 如果 components_list 为空,则退出程序 if not components_list: sys.exit("stop for no target to pack..") print('components_list', components_list) # del component_package _del_exist_component_package(out_path) args = {"out_path": out_path, "root_path": root_path, "os": os_arg, "buildArch": build_arch_arg, "hpm_packages_path": hpm_packages_path, "build_type": build_type, "organization_name": organization_name, "toolchain_info": toolchain_info } for key, value in part_subsystem.items(): part_name = key subsystem_name = value if not components_list or part_name in components_list: _package_interface(args, parts_path_info, part_name, subsystem_name, components_json) end_time = time.time() run_time = end_time - start_time print("generate_component_package out_path", out_path) print(f"Generating binary product package takes time:{run_time}") def main(): py_args = _get_args() generate_component_package(py_args.out_path, py_args.root_path, components_list=py_args.components_list, build_type=py_args.build_type, organization_name=py_args.organization_name, os_arg=py_args.os_arg, build_arch_arg=py_args.build_arch, local_test=py_args.local_test) if __name__ == '__main__': main()