1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import subprocess 21import sys 22import re 23import stat 24 25from xdevice import platform_logger 26from xdevice import EnvironmentManager 27from xdevice import ResultReporter 28from xdevice import ExecInfo 29 30FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL 31MODES = stat.S_IWUSR | stat.S_IRUSR 32 33LOG = platform_logger("distribute_utils") 34 35 36def make_device_info_file(tmp_path): 37 env_manager = EnvironmentManager() 38 device_info_file_path = os.path.join(tmp_path, 39 "device_info_file.txt") 40 if os.path.exists(device_info_file_path): 41 os.remove(device_info_file_path) 42 with os.fdopen(os.open(device_info_file_path, FLAGS, MODES), 'w') as file_handle: 43 if not env_manager.managers: 44 return 45 if list(env_manager.managers.values())[0].devices_list: 46 for device in list(env_manager.managers.values())[0].devices_list: 47 get_device_info(device, file_handle) 48 else: 49 for device in list(env_manager.managers.values())[1].devices_list: 50 get_device_info(device, file_handle) 51 52 53def get_device_info(device, file_handle): 54 """ 55 56 :param device: 57 :param file_handle: 58 :return: 59 """ 60 if device.test_device_state.value == "ONLINE": 61 status = device.label if device.label else 'None' 62 LOG.info("%s,%s" % (device.device_sn, status)) 63 file_handle.write("%s,%s,%s,%s\n" % ( 64 device.device_sn, 65 device.label if device.label else 'None', 66 device.host, 67 device.port)) 68 69 70def make_reports(result_rootpath, start_time): 71 exec_info = ExecInfo() 72 exec_info.test_type = "systemtest" 73 exec_info.device_name = "ALL" 74 exec_info.host_info = "" 75 exec_info.test_time = start_time 76 exec_info.log_path = os.path.join(result_rootpath, "log") 77 exec_info.platform = "ALL" 78 exec_info.execute_time = "" 79 80 result_report = ResultReporter() 81 result_report.__generate_reports__(report_path=result_rootpath, 82 task_info=exec_info) 83 return True 84 85 86def check_ditributetest_environment(): 87 env_manager = EnvironmentManager() 88 devices_list = list(env_manager.managers.values())[0].devices_list 89 if not devices_list: 90 devices_list = list(env_manager.managers.values())[1].devices_list 91 evn_status = True 92 if len(devices_list) == 0: 93 LOG.error("no devices online") 94 return False 95 device_zero = devices_list[0] 96 for device in devices_list: 97 if device != device_zero: 98 device_ip = query_device_ip(device) 99 if device_ip == "": 100 evn_status = False 101 continue 102 if not check_zdn_network(device, device_ip): 103 LOG.error("device_%s ping devices_%s failed" % ( 104 device_zero.device_sn, device.device_sn)) 105 evn_status = False 106 if not evn_status: 107 LOG.error("Environment status = False, test end") 108 return False 109 LOG.info("Environment status == True") 110 return True 111 112 113def check_zdn_network(device, device_ip=""): 114 command = "ping -c 3 " + device_ip 115 output = device.execute_shell_command(command) 116 if "" == output: 117 return False 118 packet_lose = re.findall(r"\d+%", output) 119 LOG.info("packet lose=%s " % packet_lose[0]) 120 if "0%" == packet_lose[0]: 121 return True 122 return False 123 124 125def query_device_ip(device): 126 output = device.execute_shell_command("getprop ohos.boot.hardware") 127 if output == "": 128 return "" 129 130 isemulator = re.findall(r"read only", output) 131 output = device.execute_shell_command("ifconfig") 132 if output == "": 133 return "" 134 135 if 0 != len(isemulator): 136 ipaddress = re.findall(r"\b10\.0\.2\.[0-9]{1,3}\b", output) 137 else: 138 ip_template = r"\b1[0-9]{1,2}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\b" 139 ipaddress = re.findall(ip_template, output) 140 if len(ipaddress) == 0: 141 LOG.error("get device_%s ip fail,please check the net" 142 % device.device_sn) 143 return "" 144 return ipaddress[0] 145 146 147def get_test_case(test_case): 148 result = {} 149 for test in test_case: 150 case_dir, file_name = os.path.split(test) 151 if not result.get(case_dir): 152 result[case_dir] = {"suits_dir": case_dir} 153 if file_name.endswith("Test"): 154 result[case_dir]["major_target_name"] = file_name 155 else: 156 result[case_dir]["agent_target_name"] = file_name 157 return list(result.values()) 158 159