• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import sys
21
22
23def _init_sys_config():
24    sys.localcoverage_path = os.path.join(current_path, "..")
25    sys.path.insert(0, sys.localcoverage_path)
26
27
28def get_subsystem_name(partname: str) -> str:
29    """
30    获取部件所属的子系统名
31    :param partname:
32    :return: 子系统名
33    """
34    parts_info_json = os.path.join(out_path, "build_configs", "parts_info", "part_subsystem.json")
35    if not os.path.exists(parts_info_json):
36        logger("{} not exists.".format(parts_info_json), "ERROR")
37        return ""
38    json_obj = json_parse(parts_info_json)
39    if json_obj:
40        if partname not in json_obj:
41            logger("{} part not exist in {}".format(partname, parts_info_json), "ERROR")
42            return ""
43        return json_obj[partname]
44    return ""
45
46
47def find_part_so_dest_path(test_part: str) -> str:
48    """
49    获取指定部件的obj目录
50    :param test_part:部件名称
51    :return:部件obj目录
52    """
53    parts_info_json = os.path.join(out_path, "build_configs", "parts_info", "parts_path_info.json")
54    if not os.path.exists(parts_info_json):
55        logger("{} not exists.".format(parts_info_json), "ERROR")
56        return ""
57    json_obj = json_parse(parts_info_json)
58    if json_obj:
59        if test_part not in json_obj:
60            logger("{} part not exist in {}.".format(test_part, parts_info_json), "ERROR")
61            return ""
62        path = os.path.join(out_path, "obj", json_obj[test_part])
63        return path
64
65    return ""
66
67
68def find_subsystem_so_dest_path(sub_system: str) -> list:
69    """
70    获取指定子系统的obj目录
71    :param sub_system:子系统名
72    :return: 子系统下所有部件obj目录的列表
73    """
74    subsystem_config_json = os.path.join(out_path, "build_configs", "subsystem_info", "subsystem_build_config.json")
75    if not os.path.exists(subsystem_config_json):
76        logger("{} not exists.".format(subsystem_config_json), "ERROR")
77        return []
78
79    json_obj = json_parse(subsystem_config_json)
80    if json_obj:
81        if sub_system not in json_obj["subsystem"]:
82            logger("{} not exist in subsystem_build_config.json".format(sub_system), "ERROR")
83            return []
84        if "path" not in json_obj["subsystem"][sub_system]:
85            logger("{} no path in subsystem_build_config.json".format(sub_system), "ERROR")
86            return []
87
88        path = list()
89        for s in json_obj["subsystem"][sub_system]["path"]:
90            path.append(os.path.join(out_path, "obj", s))
91        return path
92
93    return []
94
95
96def find_so_source_dest(path: str, subsystem_name: str) -> dict:
97    """
98    获取so和设备里所在目录的对应关系
99    :param path: 子系统obj目录
100    :return: so和设备里所在目录的对应关系dict
101    """
102    so_dict = dict()
103    json_list = list()
104    if not path:
105        return {}
106
107    json_list = tree_find_file_endswith(path, "_module_info.json", json_list)
108
109    for j in json_list:
110        json_obj = json_parse(j)
111        if "subsystem_name" not in json_obj:
112            continue
113        if json_obj["subsystem_name"] != subsystem_name:
114            continue
115        if "source" not in json_obj or "dest" not in json_obj:
116            logger("{} json file error.".format(j), "ERROR")
117            return {}
118
119        source_path = os.path.join(out_path, json_obj["source"])
120        if source_path.endswith(".so"):
121            so_dict[source_path] = json_obj["dest"]
122
123    return so_dict
124
125
126def push_coverage_so(so_dict: dict):
127    """
128    推送so到设备
129    :param so_dict: so和设备里目录对应dict
130    :return:
131    """
132    if not so_dict:
133        logger("No coverage so to push.", "INFO")
134        return
135    for device in device_sn_list:
136        cmd = "shell mount -o rw,remount /"
137        hdc_command(device_ip, device_port, device, cmd)
138        for source_path, dest_paths in so_dict.items():
139            if not os.path.exists(source_path):
140                logger("{} not exist.".format(source_path), "ERROR")
141                continue
142            for dest_path in dest_paths:
143                full_dest = os.path.join("/", dest_path)
144                command = "file send {} {}".format(source_path, full_dest)
145                hdc_command(device_ip, device_port, device, command)
146
147
148if __name__ == "__main__":
149    current_path = os.path.abspath(os.path.dirname(__name__))
150
151    _init_sys_config()
152    from localCoverage.resident_service.public_method import get_config_ip, get_sn_list
153    from localCoverage.utils import get_product_name, hdc_command, tree_find_file_endswith,\
154                                    json_parse, logger, is_elffile
155
156    root_path = current_path.split("/test/testfwk/developer_test")[0]
157    out_path = os.path.join(root_path, "out", get_product_name(root_path))
158    developer_path = os.path.join(root_path, "test", "testfwk", "developer_test")
159
160    # 获取远程映射相关hdc参数
161    device_ip, device_port, device_sn_strs = get_config_ip(os.path.join(developer_path, "config", "user_config.xml"))
162    if not device_port:
163        device_port = "8710"
164    if not device_sn_strs:
165        device_sn_list = get_sn_list("hdc -s {}:{} list targets".format(device_ip, device_port))
166    else:
167        device_sn_list = device_sn_strs.split(";")
168
169    subsystem_list, testpart_list = [], []
170    testtype = sys.argv[1]
171    param_list = sys.argv[2:]
172
173    # 入参为ss子系统和tp部件分别处理
174    if testtype == "testpart":
175        for param in param_list:
176            testpart_list.append(param.strip("[").strip("]").strip(","))
177            for testpart in testpart_list:
178                json_path_list = find_part_so_dest_path(testpart)
179                subsystem = get_subsystem_name(testpart)
180                source_dest_dict = find_so_source_dest(json_path_list, subsystem)
181                push_coverage_so(source_dest_dict)
182    else:
183        for param in param_list:
184            subsystem_list.append(param.strip("[").strip("]").strip(","))
185
186            if len(subsystem_list) == 1:
187                subsystem = subsystem_list[0]
188                json_path_list = find_subsystem_so_dest_path(subsystem)
189                for json_path in json_path_list:
190                    source_dest_dict = find_so_source_dest(json_path, subsystem)
191                    push_coverage_so(source_dest_dict)
192