• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import re
21import stat
22import json
23import time
24import platform
25import subprocess
26import signal
27from threading import Timer
28
29from _core.utils import get_file_absolute_path
30from _core.logger import platform_logger
31from _core.exception import ParamError
32from _core.constants import DeviceTestType
33from _core.constants import FilePermission
34from _core.constants import DeviceConnectorType
35
36LOG = platform_logger("Kit")
37
38TARGET_SDK_VERSION = 22
39
40__all__ = ["get_app_name_by_tool", "junit_para_parse", "gtest_para_parse",
41           "get_install_args", "reset_junit_para", "remount", "disable_keyguard",
42           "timeout_callback", "unlock_screen", "unlock_device", "get_class"]
43
44
45def remount(device):
46    device.enable_hdc_root()
47    cmd = "target mount" \
48        if device.usb_type == DeviceConnectorType.hdc else "remount"
49    device.connector_command(cmd)
50    device.execute_shell_command("remount")
51    device.execute_shell_command("mount -o rw,remount /cust")
52    device.execute_shell_command("mount -o rw,remount /product")
53    device.execute_shell_command("mount -o rw,remount /hw_product")
54    device.execute_shell_command("mount -o rw,remount /version")
55    device.execute_shell_command("mount -o rw,remount /%s" % "system")
56
57
58def get_class(junit_paras, prefix_char, para_name):
59    if not junit_paras.get(para_name):
60        return ""
61
62    result = ""
63    if prefix_char == "-e":
64        result = " %s class " % prefix_char
65    elif prefix_char == "--":
66        result = " %sclass " % prefix_char
67    elif prefix_char == "-s":
68        result = " %s class " % prefix_char
69    test_items = []
70    for test in junit_paras.get(para_name):
71        test_item = test.split("#")
72        if len(test_item) == 1 or len(test_item) == 2:
73            test_item = "%s" % test
74            test_items.append(test_item)
75        elif len(test_item) == 3:
76            test_item = "%s#%s" % (test_item[1], test_item[2])
77            test_items.append(test_item)
78        else:
79            raise ParamError("The parameter %s %s is error" % (
80                             prefix_char, para_name))
81    if not result:
82        LOG.debug("There is unsolved prefix char: %s ." % prefix_char)
83    return result + ",".join(test_items)
84
85
86def junit_para_parse(device, junit_paras, prefix_char="-e"):
87    """To parse the para of junit
88    Args:
89        device: the device running
90        junit_paras: the para dict of junit
91        prefix_char: the prefix char of parsed cmd
92    Returns:
93        the new para using in a command like -e testFile xxx
94        -e coverage true...
95    """
96    ret_str = []
97    path = "/%s/%s/%s" % ("data", "local", "ajur")
98    include_file = "%s/%s" % (path, "includes.txt")
99    exclude_file = "%s/%s" % (path, "excludes.txt")
100
101    if not isinstance(junit_paras, dict):
102        LOG.warning("The para of junit is not the dict format as required")
103        return ""
104    # Disable screen keyguard
105    disable_key_guard = junit_paras.get('disable-keyguard')
106    if not disable_key_guard or disable_key_guard[0].lower() != 'false':
107        disable_keyguard(device)
108
109    for para_name in junit_paras.keys():
110        path = "/%s/%s/%s/" % ("data", "local", "ajur")
111        if para_name.strip() == 'test-file-include-filter':
112            for file_name in junit_paras[para_name]:
113                device.push_file(file_name, include_file)
114                device.execute_shell_command(
115                    'chown -R shell:shell %s' % path)
116            ret_str.append(" ".join([prefix_char, 'testFile', include_file]))
117        elif para_name.strip() == "test-file-exclude-filter":
118            for file_name in junit_paras[para_name]:
119                device.push_file(file_name, exclude_file)
120                device.execute_shell_command(
121                    'chown -R shell:shell %s' % path)
122            ret_str.append(" ".join([prefix_char, 'notTestFile',
123                                     exclude_file]))
124        elif para_name.strip() == "test" or para_name.strip() == "class":
125            result = get_class(junit_paras, prefix_char, para_name.strip())
126            ret_str.append(result)
127        elif para_name.strip() == "include-annotation":
128            ret_str.append(" ".join([prefix_char, "annotation",
129                                     ",".join(junit_paras[para_name])]))
130        elif para_name.strip() == "exclude-annotation":
131            ret_str.append(" ".join([prefix_char, "notAnnotation",
132                                     ",".join(junit_paras[para_name])]))
133        else:
134            ret_str.append(" ".join([prefix_char, para_name,
135                                     ",".join(junit_paras[para_name])]))
136
137    return " ".join(ret_str)
138
139
140def get_include_tests(para_datas, test_types, runner):
141    case_list = []
142    if test_types == "class":
143        case_list = para_datas
144    else:
145        for case_file in para_datas:
146            flags = os.O_RDONLY
147            modes = stat.S_IWUSR | stat.S_IRUSR
148            with os.fdopen(os.open(case_file, flags, modes), "r") as file_desc:
149                case_list.extend(file_desc.read().splitlines())
150    runner.add_instrumentation_arg("gtest_filter", ":".join(case_list).replace("#", "."))
151
152
153def get_all_test_include(para_datas, test_types, runner, request):
154    case_list = []
155    if test_types == "notClass":
156        case_list = para_datas
157    else:
158        if para_datas:
159            flags = os.O_RDONLY
160            modes = stat.S_IWUSR | stat.S_IRUSR
161            with os.fdopen(os.open(para_datas[0], flags, modes), "r") as file_handler:
162                json_data = json.load(file_handler)
163            exclude_list = json_data.get(DeviceTestType.cpp_test, [])
164            for exclude in exclude_list:
165                if request.get_module_name() in exclude:
166                    temp = exclude.get(request.get_module_name())
167                    case_list.extend(temp)
168    runner.add_instrumentation_arg("gtest_filter", "{}{}".format("-", ":".join(case_list)).replace("#", "."))
169
170
171def gtest_para_parse(gtest_paras, runner, request):
172    """To parse the para of gtest
173    Args:
174        gtest_paras: the para dict of gtest
175    Returns:
176        the new para using in gtest
177    """
178    if not isinstance(gtest_paras, dict):
179        LOG.warning("The para of gtest is not the dict format as required")
180        return ""
181
182    for para in gtest_paras.keys():
183        test_types = para.strip()
184        para_datas = gtest_paras.get(para)
185        if test_types in ["test-file-include-filter", "class"]:
186            get_include_tests(para_datas, test_types, runner)
187        elif test_types in ["all-test-file-exclude-filter", "notClass"]:
188            get_all_test_include(para_datas, test_types, runner, request)
189    return ""
190
191
192def reset_junit_para(junit_para_str, prefix_char="-e", ignore_keys=None):
193    if not ignore_keys and not isinstance(ignore_keys, list):
194        ignore_keys = ["class", "test"]
195    lines = junit_para_str.split("%s " % prefix_char)
196    normal_lines = []
197    for line in lines:
198        line = line.strip()
199        if line:
200            items = line.split()
201            if items[0].strip() in ignore_keys:
202                continue
203            normal_lines.append("{} {}".format(prefix_char, line))
204    return " ".join(normal_lines)
205
206
207def get_install_args(device, app_name, original_args=None):
208    """To obtain all the args of app install
209    Args:
210        original_args: the argus configure in .config file
211        device : the device will be installed app
212        app_name : the name of the app which will be installed
213    Returns:
214        All the args
215    """
216    if original_args is None:
217        original_args = []
218    new_args = original_args[:]
219    try:
220        sdk_version = device.get_property("ro.build.version.sdk")
221        if int(sdk_version) > TARGET_SDK_VERSION:
222            new_args.append("-g")
223    except TypeError as type_error:
224        LOG.error("Obtain the sdk version failed with exception {}".format(
225            type_error))
226    except ValueError as value_error:
227        LOG.error("Obtain the sdk version failed with exception {}".format(
228            value_error))
229    if app_name.endswith(".apex"):
230        new_args.append("--apex")
231    return " ".join(new_args)
232
233
234def get_app_name_by_tool(app_path, paths):
235    """To obtain the app name by using tool
236    Args:
237        app_path: the path of app
238        paths:
239    Returns:
240        The Pkg Name if found else None
241    """
242    rex = "^package:\\s+name='(.*?)'.*$"
243    aapt_tool_name = "aapt.exe" if os.name == "nt" else "aapt"
244    if app_path:
245        proc_timer = None
246        try:
247            tool_file = get_file_absolute_path(os.path.join(
248                "tools", aapt_tool_name), paths)
249            LOG.debug("Aapt file is %s" % tool_file)
250
251            if platform.system() == "Linux":
252                if not oct(os.stat(tool_file).st_mode)[-3:] == "755":
253                    os.chmod(tool_file, FilePermission.mode_755)
254
255            cmd = [tool_file, "dump", "badging", app_path]
256            timeout = 300
257            LOG.info("Execute command %s with %s" % (" ".join(cmd), timeout))
258
259            sub_process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
260                                           stderr=subprocess.PIPE)
261            proc_timer = Timer(timeout, timeout_callback, [sub_process])
262            proc_timer.start()
263            # The package name must be return in first line
264            output = sub_process.stdout.readline()
265            error = sub_process.stderr.readline()
266            LOG.debug("The output of aapt is {}".format(output))
267            if error:
268                LOG.debug("The error of aapt is {}".format(error))
269            if output:
270                pkg_match = re.match(rex, output.decode("utf8", 'ignore'))
271                if pkg_match is not None:
272                    LOG.info(
273                        "Obtain the app name {} successfully by using "
274                        "aapt".format(pkg_match.group(1)))
275                    return pkg_match.group(1)
276            return None
277        except (FileNotFoundError, ParamError) as error:
278            LOG.debug("Aapt error: %s", error.args)
279            return None
280        finally:
281            if proc_timer:
282                proc_timer.cancel()
283    else:
284        LOG.error("get_app_name_by_tool error.")
285        return None
286
287
288def timeout_callback(proc):
289    try:
290        LOG.error("Error: execute command timeout.")
291        LOG.error(proc.pid)
292        if platform.system() != "Windows":
293            os.killpg(proc.pid, signal.SIGKILL)
294        else:
295            subprocess.call(
296                ["C:\\Windows\\System32\\taskkill", "/F", "/T", "/PID",
297                 str(proc.pid)], shell=False)
298    except (FileNotFoundError, KeyboardInterrupt, AttributeError) as error:
299        LOG.exception("Timeout callback exception: %s" % error, exc_info=False)
300
301
302def disable_keyguard(device):
303    unlock_screen(device)
304    unlock_device(device)
305
306
307def unlock_screen(device):
308    device.execute_shell_command("svc power stayon true")
309    time.sleep(1)
310
311
312def unlock_device(device):
313    device.execute_shell_command("input keyevent 82")
314    time.sleep(1)
315    device.execute_shell_command("wm dismiss-keyguard")
316    time.sleep(1)
317