1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import platform 21import subprocess 22import tempfile 23from abc import ABCMeta 24from abc import abstractmethod 25 26from core.config.resource_manager import ResourceManager 27 28 29############################################################################## 30############################################################################## 31 32DEVICE_TEST_PATH = "/%s/%s/" % ("data", "test") 33 34 35def get_level_para_string(level_string): 36 level_list = list(set(level_string.split(","))) 37 level_para_string = "" 38 for item in level_list: 39 if not item.isdigit(): 40 continue 41 item = item.strip(" ") 42 level_para_string = (f"{level_para_string}Level{item},") 43 level_para_string = level_para_string.strip(",") 44 return level_para_string 45 46 47def make_long_command_file(command, longcommand_path, filename): 48 sh_file_name = '%s.sh' % filename 49 file_path = os.path.join(longcommand_path, sh_file_name) 50 try: 51 with open(file_path, "a") as file_desc: 52 file_desc.write(command) 53 except(IOError, ValueError) as err_msg: 54 print("Error for make long command file: ", err_msg) 55 return sh_file_name, file_path 56 57 58def is_exist_target_in_device(device, path, target): 59 command = "ls -l %s | grep %s" % (path, target) 60 check_result = False 61 stdout_info = device.shell_with_output(command) 62 if stdout_info != "" and stdout_info.find(target) != -1: 63 check_result = True 64 return check_result 65 66 67def receive_coverage_data(device, result_path, suite_file, file_name): 68 file_dir = suite_file.split("distributedtest\\")[1].split("\\")[0] 69 target_name = "obj" 70 cxx_cov_path = os.path.abspath(os.path.join( 71 result_path, 72 "..", 73 "coverage", 74 "data", 75 "cxx", 76 file_name + '_' + file_dir + '_' + device.device_sn)) 77 78 if is_exist_target_in_device(device, DEVICE_TEST_PATH, "obj"): 79 if not os.path.exists(cxx_cov_path): 80 os.makedirs(cxx_cov_path) 81 device.shell( 82 "cd %s; tar -czf %s.tar.gz %s" % (DEVICE_TEST_PATH, target_name, target_name)) 83 src_file_tar = os.path.join(DEVICE_TEST_PATH, "%s.tar.gz" % target_name) 84 device.pull_file(src_file_tar, cxx_cov_path) 85 tar_path = os.path.join(cxx_cov_path, "%s.tar.gz" % target_name) 86 if platform.system() == "Windows": 87 process = subprocess.Popen("tar -zxf %s -C %s" % (tar_path, cxx_cov_path), shell=True) 88 process.communicate() 89 os.remove(tar_path) 90 else: 91 subprocess.Popen("tar -zxf %s -C %s > /dev/null 2>&1" % 92 (tar_path, cxx_cov_path), shell=True) 93 subprocess.Popen("rm -rf %s" % tar_path, shell=True) 94 95 96############################################################################## 97############################################################################## 98 99 100class ITestDriver: 101 __metaclass__ = ABCMeta 102 103 @abstractmethod 104 def execute(self, suite_file, result_path, options, push_flag=False): 105 pass 106 107 108############################################################################## 109############################################################################## 110 111 112class CppTestDriver(ITestDriver): 113 def __init__(self, device, hdc_tools): 114 self.device = device 115 self.hdc_tools = hdc_tools 116 117 def execute(self, suite_file, result_path, options, background=False): 118 case_dir, file_name = os.path.split(suite_file) 119 dst_dir = case_dir.split("distributedtest\\")[1] 120 os.makedirs(os.path.join(result_path, "temp", dst_dir), exist_ok=True) 121 122 long_command_path = tempfile.mkdtemp( 123 prefix="long_command_", 124 dir=os.path.join(result_path, "temp", dst_dir)) 125 if not options.coverage: 126 command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s" % ( 127 DEVICE_TEST_PATH, 128 file_name, 129 file_name) 130 else: 131 coverage_outpath = options.coverage_outpath 132 strip_num = len(coverage_outpath.split("/")) - 1 133 command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=. " \ 134 "GCOV_PREFIX_STRIP=%s ./%s" % \ 135 (DEVICE_TEST_PATH, 136 file_name, 137 str(strip_num), 138 file_name, 139 ) 140 141 print("command: %s" % command) 142 sh_file_name, file_path = make_long_command_file(command, 143 long_command_path, 144 file_name) 145 self.device.push_file(file_path, DEVICE_TEST_PATH) 146 147 # push resource files 148 resource_manager = ResourceManager() 149 resource_data_dic, resource_dir = \ 150 resource_manager.get_resource_data_dic(suite_file) 151 resource_manager.process_preparer_data(resource_data_dic, resource_dir, 152 self.device) 153 if background: 154 sh_command = "nohup sh %s >%s 2>&1 &" % ( 155 os.path.join(DEVICE_TEST_PATH, sh_file_name), 156 os.path.join(DEVICE_TEST_PATH, "agent.log")) 157 else: 158 sh_command = "sh %s" % ( 159 os.path.join(DEVICE_TEST_PATH, sh_file_name)) 160 self.device.shell(sh_command) 161 162 if options.coverage: 163 receive_coverage_data(self.device, result_path, suite_file, file_name) 164 165 166############################################################################## 167############################################################################## 168 169