1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import platform 21import subprocess 22import tempfile 23from abc import ABCMeta 24from abc import abstractmethod 25import stat 26 27from core.config.resource_manager import ResourceManager 28 29 30############################################################################## 31############################################################################## 32 33FLAGS = os.O_WRONLY | os.O_APPEND | os.O_CREAT 34MODES = stat.S_IWUSR | stat.S_IRUSR 35 36DEVICE_TEST_PATH = "/%s/%s/" % ("data", "test") 37 38 39def get_level_para_string(level_string): 40 level_list = list(set(level_string.split(","))) 41 level_para_string = "" 42 for item in level_list: 43 if not item.isdigit(): 44 continue 45 item = item.strip(" ") 46 level_para_string = (f"{level_para_string}Level{item},") 47 level_para_string = level_para_string.strip(",") 48 return level_para_string 49 50 51def make_long_command_file(command, longcommand_path, filename): 52 sh_file_name = '%s.sh' % filename 53 file_path = os.path.join(longcommand_path, sh_file_name) 54 try: 55 with os.fdopen(os.open(file_path, FLAGS, MODES), 'a') as file_desc: 56 file_desc.write(command) 57 except(IOError, ValueError) as err_msg: 58 print(f"Error for make long command file: {file_path}") 59 return sh_file_name, file_path 60 61 62def is_exist_target_in_device(device, path, target): 63 command = "ls -l %s | grep %s" % (path, target) 64 check_result = False 65 stdout_info = device.shell_with_output(command) 66 if stdout_info != "" and stdout_info.find(target) != -1: 67 check_result = True 68 return check_result 69 70 71def receive_coverage_data(device, result_path, suite_file, file_name): 72 file_dir = suite_file.split("distributedtest")[1].strip(os.sep).split(os.sep)[0] 73 target_name = "obj" 74 cxx_cov_path = os.path.abspath(os.path.join( 75 result_path, 76 "..", 77 "coverage", 78 "data", 79 "cxx", 80 file_name + '_' + file_dir + '_' + device.device_sn)) 81 82 if is_exist_target_in_device(device, DEVICE_TEST_PATH, "obj"): 83 if not os.path.exists(cxx_cov_path): 84 os.makedirs(cxx_cov_path) 85 device.shell( 86 "cd %s; tar -czf %s.tar.gz %s" % (DEVICE_TEST_PATH, target_name, target_name)) 87 src_file_tar = os.path.join(DEVICE_TEST_PATH, "%s.tar.gz" % target_name) 88 device.pull_file(src_file_tar, cxx_cov_path) 89 tar_path = os.path.join(cxx_cov_path, "%s.tar.gz" % target_name) 90 if platform.system() == "Windows": 91 process = subprocess.Popen("tar -zxf %s -C %s" % (tar_path, cxx_cov_path), shell=True) 92 process.communicate() 93 os.remove(tar_path) 94 else: 95 subprocess.Popen("tar -zxf %s -C %s > /dev/null 2>&1" % 96 (tar_path, cxx_cov_path), shell=True) 97 subprocess.Popen("rm -rf %s" % tar_path, shell=True) 98 99 100############################################################################## 101############################################################################## 102 103 104class ITestDriver: 105 __metaclass__ = ABCMeta 106 107 @abstractmethod 108 def execute(self, suite_file, result_path, options, push_flag=False): 109 pass 110 111 112############################################################################## 113############################################################################## 114 115 116class CppTestDriver(ITestDriver): 117 def __init__(self, device, hdc_tools): 118 self.device = device 119 self.hdc_tools = hdc_tools 120 121 def execute(self, suite_file, result_path, options, background=False): 122 case_dir, file_name = os.path.split(suite_file) 123 dst_dir = case_dir.split("distributedtest")[1].strip(os.sep) 124 os.makedirs(os.path.join(result_path, "temp", dst_dir), exist_ok=True) 125 126 long_command_path = tempfile.mkdtemp( 127 prefix="long_command_", 128 dir=os.path.join(result_path, "temp", dst_dir)) 129 if not options.coverage: 130 command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s" % ( 131 DEVICE_TEST_PATH, 132 file_name, 133 file_name) 134 else: 135 coverage_outpath = options.coverage_outpath 136 strip_num = len(coverage_outpath.split("/")) - 1 137 command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=. " \ 138 "GCOV_PREFIX_STRIP=%s ./%s" % \ 139 (DEVICE_TEST_PATH, 140 file_name, 141 str(strip_num), 142 file_name, 143 ) 144 145 print("command: %s" % command) 146 sh_file_name, file_path = make_long_command_file(command, 147 long_command_path, 148 file_name) 149 self.device.push_file(file_path, DEVICE_TEST_PATH) 150 151 # push resource files 152 resource_manager = ResourceManager() 153 resource_data_dic, resource_dir = \ 154 resource_manager.get_resource_data_dic(suite_file) 155 resource_manager.process_preparer_data(resource_data_dic, resource_dir, 156 self.device) 157 if background: 158 sh_command = "nohup sh %s >%s 2>&1 &" % ( 159 os.path.join(DEVICE_TEST_PATH, sh_file_name), 160 os.path.join(DEVICE_TEST_PATH, "agent.log")) 161 else: 162 sh_command = "sh %s" % ( 163 os.path.join(DEVICE_TEST_PATH, sh_file_name)) 164 self.device.shell(sh_command) 165 166 if options.coverage: 167 receive_coverage_data(self.device, result_path, suite_file, file_name) 168 169 if os.path.exists(long_command_path): 170 os.remove(long_command_path) 171 172 173############################################################################## 174############################################################################## 175 176