• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import subprocess
21import sys
22import re
23import stat
24
25from xdevice import platform_logger
26from xdevice import EnvironmentManager
27from xdevice import ResultReporter
28from xdevice import ExecInfo
29
30FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL
31MODES = stat.S_IWUSR | stat.S_IRUSR
32
33LOG = platform_logger("distribute_utils")
34
35
36def make_device_info_file(tmp_path):
37    env_manager = EnvironmentManager()
38    device_info_file_path = os.path.join(tmp_path,
39                                         "device_info_file.txt")
40    if os.path.exists(device_info_file_path):
41        os.remove(device_info_file_path)
42    with os.fdopen(os.open(device_info_file_path, FLAGS, MODES), 'w') as file_handle:
43        if not env_manager.managers:
44            return
45        if list(env_manager.managers.values())[0].devices_list:
46            for device in list(env_manager.managers.values())[0].devices_list:
47                get_device_info(device, file_handle)
48        else:
49            for device in list(env_manager.managers.values())[1].devices_list:
50                get_device_info(device, file_handle)
51
52
53def get_device_info(device, file_handle):
54    """
55
56    :param device:
57    :param file_handle:
58    :return:
59    """
60    if device.test_device_state.value == "ONLINE":
61        status = device.label if device.label else 'None'
62        LOG.info("%s,%s" % (device.device_sn, status))
63        file_handle.write("%s,%s,%s,%s\n" % (
64            device.device_sn,
65            device.label if device.label else 'None',
66            device.host,
67            device.port))
68
69
70def make_reports(result_rootpath, start_time):
71    exec_info = ExecInfo()
72    exec_info.test_type = "systemtest"
73    exec_info.device_name = "ALL"
74    exec_info.host_info = ""
75    exec_info.test_time = start_time
76    exec_info.log_path = os.path.join(result_rootpath, "log")
77    exec_info.platform = "ALL"
78    exec_info.execute_time = ""
79
80    result_report = ResultReporter()
81    result_report.__generate_reports__(report_path=result_rootpath,
82                                       task_info=exec_info)
83    return True
84
85
86def check_ditributetest_environment():
87    env_manager = EnvironmentManager()
88    devices_list = list(env_manager.managers.values())[0].devices_list
89    if not devices_list:
90        devices_list = list(env_manager.managers.values())[1].devices_list
91    evn_status = True
92    if len(devices_list) == 0:
93        LOG.error("no devices online")
94        return False
95    device_zero = devices_list[0]
96    for device in devices_list:
97        if device != device_zero:
98            device_ip = query_device_ip(device)
99            if device_ip == "":
100                evn_status = False
101                continue
102            if not check_zdn_network(device, device_ip):
103                LOG.error("device_%s ping devices_%s failed" % (
104                    device_zero.device_sn, device.device_sn))
105                evn_status = False
106    if not evn_status:
107        LOG.error("Environment status = False, test end")
108        return False
109    LOG.info("Environment status == True")
110    return True
111
112
113def check_zdn_network(device, device_ip=""):
114    command = "ping -c 3 " + device_ip
115    output = device.execute_shell_command(command)
116    if "" == output:
117        return False
118    packet_lose = re.findall(r"\d+%", output)
119    LOG.info("packet lose=%s " % packet_lose[0])
120    if "0%" == packet_lose[0]:
121        return True
122    return False
123
124
125def query_device_ip(device):
126    output = device.execute_shell_command("getprop ohos.boot.hardware")
127    if output == "":
128        return ""
129
130    isemulator = re.findall(r"read only", output)
131    output = device.execute_shell_command("ifconfig")
132    if output == "":
133        return ""
134
135    if 0 != len(isemulator):
136        ipaddress = re.findall(r"\b10\.0\.2\.[0-9]{1,3}\b", output)
137    else:
138        ip_template = r"\b1[0-9]{1,2}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\b"
139        ipaddress = re.findall(ip_template, output)
140    if len(ipaddress) == 0:
141        LOG.error("get device_%s ip fail,please check the net"
142                  % device.device_sn)
143        return ""
144    return ipaddress[0]
145
146
147def get_test_case(test_case):
148    result = {}
149    for test in test_case:
150        case_dir, file_name = os.path.split(test)
151        if not result.get(case_dir):
152            result[case_dir] = {"suits_dir": case_dir}
153        if file_name.endswith("Test"):
154            result[case_dir]["major_target_name"] = file_name
155        else:
156            result[case_dir]["agent_target_name"] = file_name
157    return list(result.values())
158
159