• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import sys
21
22
23def _init_sys_config():
24    sys.localcoverage_path = os.path.join(current_path, "..")
25    sys.path.insert(0, sys.localcoverage_path)
26
27
28def get_subsystem_name(partname: str) -> str:
29    """
30    获取部件所属的子系统名
31    :param partname:
32    :return: 子系统名
33    """
34    parts_info_json = os.path.join(out_path, "build_configs", "parts_info", "part_subsystem.json")
35    if not os.path.exists(parts_info_json):
36        logger("{} not exists.".format(parts_info_json), "ERROR")
37        return ""
38    json_obj = json_parse(parts_info_json)
39    if json_obj:
40        if partname not in json_obj:
41            logger("{} part not exist in {}".format(partname, parts_info_json), "ERROR")
42            return ""
43        return json_obj[partname]
44    return ""
45
46
47def find_part_so_dest_path(test_part: str) -> str:
48    """
49    获取指定部件的obj目录
50    :param test_part:部件名称
51    :return:部件obj目录
52    """
53    parts_info_json = os.path.join(out_path, "build_configs", "parts_info", "parts_path_info.json")
54    if not os.path.exists(parts_info_json):
55        logger("{} not exists.".format(parts_info_json), "ERROR")
56        return ""
57    json_obj = json_parse(parts_info_json)
58    if json_obj:
59        if test_part not in json_obj:
60            logger("{} part not exist in {}.".format(test_part, parts_info_json), "ERROR")
61            return ""
62        path = os.path.join(out_path, "obj", json_obj[test_part])
63        return path
64
65    return ""
66
67
68def find_subsystem_so_dest_path(sub_system: str) -> list:
69    """
70    获取指定子系统的obj目录
71    :param sub_system:子系统名
72    :return: 子系统下所有部件obj目录的列表
73    """
74    subsystem_config_json = os.path.join(out_path, "build_configs", "subsystem_info", "subsystem_build_config.json")
75    if not os.path.exists(subsystem_config_json):
76        logger("{} not exists.".format(subsystem_config_json), "ERROR")
77        return []
78
79    json_obj = json_parse(subsystem_config_json)
80    if json_obj:
81        if sub_system not in json_obj["subsystem"]:
82            logger("{} not exist in subsystem_build_config.json".format(sub_system), "ERROR")
83            return []
84        if "path" not in json_obj["subsystem"][sub_system]:
85            logger("{} no path in subsystem_build_config.json".format(sub_system), "ERROR")
86            return []
87
88        path = list()
89        for s in json_obj["subsystem"][sub_system]["path"]:
90            path.append(os.path.join(out_path, "obj", s))
91        return path
92
93    return []
94
95
96def find_so_source_dest(path: str, subsystem_name: str) -> dict:
97    """
98    获取so和设备里所在目录的对应关系
99    :param path: 子系统obj目录
100    :return: so和设备里所在目录的对应关系dict
101    """
102    so_dict = dict()
103    json_list = list()
104    if not path:
105        return {}
106
107    json_list = tree_find_file_endswith(path, "_module_info.json", json_list)
108
109    for j in json_list:
110        json_obj = json_parse(j)
111        if "subsystem_name" not in json_obj:
112            continue
113        if json_obj["subsystem_name"] != subsystem_name:
114            continue
115        if "source" not in json_obj or "dest" not in json_obj:
116            logger("{} json file error.".format(j), "ERROR")
117            return {}
118
119        source_path = os.path.join(out_path, json_obj["source"])
120        if source_path.endswith(".so"):
121            so_dict[source_path] = [tmp for tmp in json_obj["dest"] if (
122                    tmp.startswith("system/") or tmp.startswith("vendor/"))]
123
124    return so_dict
125
126
127def push_coverage_so(so_dict: dict):
128    """
129    推送so到设备
130    :param so_dict: so和设备里目录对应dict
131    :return:
132    """
133    if not so_dict:
134        logger("No coverage so to push.", "INFO")
135        return
136    for device in device_sn_list:
137        cmd = "shell mount -o rw,remount /"
138        hdc_command(device_ip, device_port, device, cmd)
139        for source_path, dest_paths in so_dict.items():
140            if not os.path.exists(source_path):
141                logger("{} not exist.".format(source_path), "ERROR")
142                continue
143            for dest_path in dest_paths:
144                full_dest = os.path.join("/", dest_path)
145                command = "file send {} {}".format(source_path, full_dest)
146                hdc_command(device_ip, device_port, device, command)
147
148
149if __name__ == "__main__":
150    current_path = os.path.abspath(os.path.dirname(__name__))
151
152    _init_sys_config()
153    from localCoverage.resident_service.public_method import get_config_ip, get_sn_list
154    from localCoverage.utils import get_product_name, hdc_command, tree_find_file_endswith,\
155                                    json_parse, logger, is_elffile
156
157    root_path = current_path.split("/test/testfwk/developer_test")[0]
158    out_path = os.path.join(root_path, "out", get_product_name(root_path))
159    developer_path = os.path.join(root_path, "test", "testfwk", "developer_test")
160
161    # 获取远程映射相关hdc参数
162    device_ip, device_port, device_sn_strs = get_config_ip(os.path.join(developer_path, "config", "user_config.xml"))
163    if not device_port:
164        device_port = "8710"
165    if not device_sn_strs:
166        device_sn_list = get_sn_list("hdc -s {}:{} list targets".format(device_ip, device_port))
167    else:
168        device_sn_list = device_sn_strs.split(";")
169
170    subsystem_list, testpart_list = [], []
171    testtype = sys.argv[1]
172    param_list = sys.argv[2:]
173
174    # 入参为ss子系统和tp部件分别处理
175    if testtype == "testpart":
176        for param in param_list:
177            testpart_list.append(param.strip("[").strip("]").strip(","))
178            for testpart in testpart_list:
179                json_path_list = find_part_so_dest_path(testpart)
180                subsystem = get_subsystem_name(testpart)
181                source_dest_dict = find_so_source_dest(json_path_list, subsystem)
182                push_coverage_so(source_dest_dict)
183    else:
184        for param in param_list:
185            subsystem_list.append(param.strip("[").strip("]").strip(","))
186
187            if len(subsystem_list) == 1:
188                subsystem = subsystem_list[0]
189                json_path_list = find_subsystem_so_dest_path(subsystem)
190                for json_path in json_path_list:
191                    source_dest_dict = find_so_source_dest(json_path, subsystem)
192                    push_coverage_so(source_dest_dict)
193