• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import platform
21import subprocess
22import tempfile
23from abc import ABCMeta
24from abc import abstractmethod
25import stat
26
27from core.config.resource_manager import ResourceManager
28
29
30##############################################################################
31##############################################################################
32
33FLAGS = os.O_WRONLY | os.O_APPEND | os.O_CREAT
34MODES = stat.S_IWUSR | stat.S_IRUSR
35
36DEVICE_TEST_PATH = "/%s/%s/" % ("data", "test")
37
38
39def get_level_para_string(level_string):
40    level_list = list(set(level_string.split(",")))
41    level_para_string = ""
42    for item in level_list:
43        if not item.isdigit():
44            continue
45        item = item.strip(" ")
46        level_para_string = (f"{level_para_string}Level{item},")
47    level_para_string = level_para_string.strip(",")
48    return level_para_string
49
50
51def make_long_command_file(command, longcommand_path, filename):
52    sh_file_name = '%s.sh' % filename
53    file_path = os.path.join(longcommand_path, sh_file_name)
54    try:
55        with os.fdopen(os.open(file_path, FLAGS, MODES), 'a') as file_desc:
56            file_desc.write(command)
57    except(IOError, ValueError) as err_msg:
58        print(f"Error for make long command file: {file_path}")
59    return sh_file_name, file_path
60
61
62def is_exist_target_in_device(device, path, target):
63    command = "ls -l %s | grep %s" % (path, target)
64    check_result = False
65    stdout_info = device.shell_with_output(command)
66    if stdout_info != "" and stdout_info.find(target) != -1:
67        check_result = True
68    return check_result
69
70
71def receive_coverage_data(device, result_path, suite_file, file_name):
72    file_dir = suite_file.split("distributedtest")[1].strip(os.sep).split(os.sep)[0]
73    target_name = "obj"
74    cxx_cov_path = os.path.abspath(os.path.join(
75        result_path,
76        "..",
77        "coverage",
78        "data",
79        "cxx",
80        file_name + '_' + file_dir + '_' + device.device_sn))
81
82    if is_exist_target_in_device(device, DEVICE_TEST_PATH, "obj"):
83        if not os.path.exists(cxx_cov_path):
84            os.makedirs(cxx_cov_path)
85        device.shell(
86            "cd %s; tar -czf %s.tar.gz %s" % (DEVICE_TEST_PATH, target_name, target_name))
87        src_file_tar = os.path.join(DEVICE_TEST_PATH, "%s.tar.gz" % target_name)
88        device.pull_file(src_file_tar, cxx_cov_path)
89        tar_path = os.path.join(cxx_cov_path, "%s.tar.gz" % target_name)
90        if platform.system() == "Windows":
91            process = subprocess.Popen("tar -zxf %s -C %s" % (tar_path, cxx_cov_path), shell=True)
92            process.communicate()
93            os.remove(tar_path)
94        else:
95            subprocess.Popen("tar -zxf %s -C %s > /dev/null 2>&1" %
96                             (tar_path, cxx_cov_path), shell=True)
97            subprocess.Popen("rm -rf %s" % tar_path, shell=True)
98
99
100##############################################################################
101##############################################################################
102
103
104class ITestDriver:
105    __metaclass__ = ABCMeta
106
107    @abstractmethod
108    def execute(self, suite_file, result_path, options, push_flag=False):
109        pass
110
111
112##############################################################################
113##############################################################################
114
115
116class CppTestDriver(ITestDriver):
117    def __init__(self, device, hdc_tools):
118        self.device = device
119        self.hdc_tools = hdc_tools
120
121    def execute(self, suite_file, result_path, options, background=False):
122        case_dir, file_name = os.path.split(suite_file)
123        dst_dir = case_dir.split("distributedtest")[1].strip(os.sep)
124        os.makedirs(os.path.join(result_path, "temp", dst_dir), exist_ok=True)
125
126        long_command_path = tempfile.mkdtemp(
127            prefix="long_command_",
128            dir=os.path.join(result_path, "temp", dst_dir))
129        if not options.coverage:
130            command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s" % (
131                DEVICE_TEST_PATH,
132                file_name,
133                file_name)
134        else:
135            coverage_outpath = options.coverage_outpath
136            strip_num = len(coverage_outpath.split("/")) - 1
137            command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=. " \
138                      "GCOV_PREFIX_STRIP=%s ./%s" % \
139                      (DEVICE_TEST_PATH,
140                       file_name,
141                       str(strip_num),
142                       file_name,
143                       )
144
145        print("command: %s" % command)
146        sh_file_name, file_path = make_long_command_file(command,
147                                                         long_command_path,
148                                                         file_name)
149        self.device.push_file(file_path, DEVICE_TEST_PATH)
150
151        # push resource files
152        resource_manager = ResourceManager()
153        resource_data_dic, resource_dir = \
154            resource_manager.get_resource_data_dic(suite_file)
155        resource_manager.process_preparer_data(resource_data_dic, resource_dir,
156                                               self.device)
157        if background:
158            sh_command = "nohup sh %s >%s 2>&1 &" % (
159                os.path.join(DEVICE_TEST_PATH, sh_file_name),
160                os.path.join(DEVICE_TEST_PATH, "agent.log"))
161        else:
162            sh_command = "sh %s" % (
163                os.path.join(DEVICE_TEST_PATH, sh_file_name))
164        self.device.shell(sh_command)
165
166        if options.coverage:
167            receive_coverage_data(self.device, result_path, suite_file, file_name)
168
169        if os.path.exists(long_command_path):
170            os.remove(long_command_path)
171
172
173##############################################################################
174##############################################################################
175
176