1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import subprocess 21import sys 22import re 23from xdevice import platform_logger 24from xdevice import EnvironmentManager 25from xdevice import ResultReporter 26from xdevice import ExecInfo 27 28LOG = platform_logger("distribute_utils") 29 30 31def make_device_info_file(tmp_path): 32 env_manager = EnvironmentManager() 33 device_info_file_path = os.path.join(tmp_path, 34 "device_info_file.txt") 35 with open(device_info_file_path, "w") as file_handle: 36 if not env_manager.managers: 37 return 38 if list(env_manager.managers.values())[0].devices_list: 39 for device in list(env_manager.managers.values())[0].devices_list: 40 get_device_info(device, file_handle) 41 else: 42 for device in list(env_manager.managers.values())[1].devices_list: 43 get_device_info(device, file_handle) 44 45 46def get_device_info(device, file_handle): 47 """ 48 49 :param device: 50 :param file_handle: 51 :return: 52 """ 53 if device.test_device_state.value == "ONLINE": 54 status = device.label if device.label else 'None' 55 LOG.info("%s,%s" % (device.device_sn, status)) 56 file_handle.write("%s,%s,%s,%s\n" % ( 57 device.device_sn, 58 device.label if device.label else 'None', 59 device.host, 60 device.port)) 61 62 63def make_reports(result_rootpath, start_time): 64 exec_info = ExecInfo() 65 exec_info.test_type = "systemtest" 66 exec_info.device_name = "ALL" 67 exec_info.host_info = "" 68 exec_info.test_time = start_time 69 exec_info.log_path = os.path.join(result_rootpath, "log") 70 exec_info.platform = "ALL" 71 exec_info.execute_time = "" 72 73 result_report = ResultReporter() 74 result_report.__generate_reports__(report_path=result_rootpath, 75 task_info=exec_info) 76 return True 77 78 79def check_ditributetest_environment(): 80 env_manager = EnvironmentManager() 81 devices_list = list(env_manager.managers.values())[0].devices_list 82 if not devices_list: 83 devices_list = list(env_manager.managers.values())[1].devices_list 84 evn_status = True 85 if len(devices_list) == 0: 86 LOG.error("no devices online") 87 return False 88 device_zero = devices_list[0] 89 for device in devices_list: 90 if device != device_zero: 91 device_ip = query_device_ip(device) 92 if device_ip == "": 93 evn_status = False 94 continue 95 if not check_zdn_network(device, device_ip): 96 LOG.error("device_%s ping devices_%s failed" % ( 97 device_zero.device_sn, device.device_sn)) 98 evn_status = False 99 if not evn_status: 100 LOG.error("Environment status = False, test end") 101 return False 102 LOG.info("Environment status == True") 103 return True 104 105 106def check_zdn_network(device, device_ip=""): 107 command = "ping -c 3 " + device_ip 108 output = device.execute_shell_command(command) 109 if "" == output: 110 return False 111 packet_lose = re.findall(r"\d+%", output) 112 LOG.info("packet lose=%s " % packet_lose[0]) 113 if "0%" == packet_lose[0]: 114 return True 115 return False 116 117 118def query_device_ip(device): 119 output = device.execute_shell_command("getprop ohos.boot.hardware") 120 if output == "": 121 return "" 122 123 isemulator = re.findall(r"read only", output) 124 output = device.execute_shell_command("ifconfig") 125 if output == "": 126 return "" 127 128 if 0 != len(isemulator): 129 ipaddress = re.findall(r"\b10\.0\.2\.[0-9]{1,3}\b", output) 130 else: 131 ip_template = r"\b192\.168\.(?:[0-9]{1,3}\.)[0-9]{1,3}\b" 132 ipaddress = re.findall(ip_template, output) 133 if len(ipaddress) == 0: 134 LOG.error("get device_%s ip fail,please check the net" 135 % device.device_sn) 136 return "" 137 return ipaddress[0] 138 139 140def get_test_case(test_case): 141 result = {} 142 for test in test_case: 143 case_dir, file_name = os.path.split(test) 144 if case_dir not in result: 145 sub_data = {"suits_dir": case_dir} 146 result[case_dir] = sub_data 147 else: 148 sub_data = result.get(case_dir) 149 if file_name.endswith("Test"): 150 sub_data["major_target_name"] = file_name 151 else: 152 sub_data["agent_target_name"] = file_name 153 return list(result.values()) 154 155