• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import re
21import stat
22import json
23import time
24import platform
25import subprocess
26import signal
27from threading import Timer
28
29from _core.utils import get_file_absolute_path
30from _core.logger import platform_logger
31from _core.exception import ParamError
32from _core.constants import DeviceTestType
33from _core.constants import FilePermission
34from _core.constants import DeviceConnectorType
35
36LOG = platform_logger("Kit")
37
38TARGET_SDK_VERSION = 22
39
40__all__ = ["get_app_name_by_tool", "junit_para_parse", "gtest_para_parse",
41           "get_install_args", "reset_junit_para", "remount", "disable_keyguard",
42           "timeout_callback", "unlock_screen", "unlock_device", "get_class"]
43
44
45def remount(device):
46    device.enable_hdc_root()
47    cmd = "target mount" \
48        if device.usb_type == DeviceConnectorType.hdc else "remount"
49    device.connector_command(cmd)
50    device.execute_shell_command("remount")
51    device.execute_shell_command("mount -o rw,remount /cust")
52    device.execute_shell_command("mount -o rw,remount /product")
53    device.execute_shell_command("mount -o rw,remount /hw_product")
54    device.execute_shell_command("mount -o rw,remount /version")
55    device.execute_shell_command("mount -o rw,remount /%s" % "system")
56
57
58def get_class(junit_paras, prefix_char, para_name):
59    if not junit_paras.get(para_name):
60        return ""
61
62    result = ""
63    if prefix_char == "-e":
64        result = " %s class " % prefix_char
65    elif prefix_char == "--":
66        result = " %sclass " % prefix_char
67    elif prefix_char == "-s":
68        result = " %s class " % prefix_char
69    test_items = []
70    for test in junit_paras.get(para_name):
71        test_item = test.split("#")
72        if len(test_item) == 1 or len(test_item) == 2:
73            test_item = "%s" % test
74            test_items.append(test_item)
75        elif len(test_item) == 3:
76            test_item = "%s#%s" % (test_item[1], test_item[2])
77            test_items.append(test_item)
78        else:
79            raise ParamError("The parameter %s %s is error" % (
80                             prefix_char, para_name))
81    if not result:
82        LOG.debug("There is unsolved prefix char: %s ." % prefix_char)
83    return result + ",".join(test_items)
84
85
86def junit_para_parse(device, junit_paras, prefix_char="-e"):
87    """To parse the para of junit
88    Args:
89        device: the device running
90        junit_paras: the para dict of junit
91        prefix_char: the prefix char of parsed cmd
92    Returns:
93        the new para using in a command like -e testFile xxx
94        -e coverage true...
95    """
96    ret_str = []
97    path = "/%s/%s/%s" % ("data", "local", "ajur")
98    include_file = "%s/%s" % (path, "includes.txt")
99    exclude_file = "%s/%s" % (path, "excludes.txt")
100
101    if not isinstance(junit_paras, dict):
102        LOG.warning("The para of junit is not the dict format as required")
103        return ""
104    # Disable screen keyguard
105    disable_key_guard = junit_paras.get('disable-keyguard')
106    if not disable_key_guard or disable_key_guard[0].lower() != 'false':
107        disable_keyguard(device)
108
109    for para_name in junit_paras.keys():
110        path = "/%s/%s/%s/" % ("data", "local", "ajur")
111        if para_name.strip() == 'test-file-include-filter':
112            for file_name in junit_paras[para_name]:
113                device.push_file(file_name, include_file)
114                device.execute_shell_command(
115                    'chown -R shell:shell %s' % path)
116            ret_str.append(" ".join([prefix_char, 'testFile', include_file]))
117        elif para_name.strip() == "test-file-exclude-filter":
118            for file_name in junit_paras[para_name]:
119                device.push_file(file_name, exclude_file)
120                device.execute_shell_command(
121                    'chown -R shell:shell %s' % path)
122            ret_str.append(" ".join([prefix_char, 'notTestFile',
123                                     exclude_file]))
124        elif para_name.strip() == "test" or para_name.strip() == "class":
125            result = get_class(junit_paras, prefix_char, para_name.strip())
126            ret_str.append(result)
127        elif para_name.strip() == "include-annotation":
128            ret_str.append(" ".join([prefix_char, "annotation",
129                                     ",".join(junit_paras[para_name])]))
130        elif para_name.strip() == "exclude-annotation":
131            ret_str.append(" ".join([prefix_char, "notAnnotation",
132                                     ",".join(junit_paras[para_name])]))
133        else:
134            ret_str.append(" ".join([prefix_char, para_name,
135                                     ",".join(junit_paras[para_name])]))
136
137    return " ".join(ret_str)
138
139
140def gtest_para_parse(gtest_paras, runner, request):
141    """To parse the para of gtest
142    Args:
143        gtest_paras: the para dict of gtest
144    Returns:
145        the new para using in gtest
146    """
147    ret_str = []
148    if not isinstance(gtest_paras, dict):
149        LOG.warning("The para of gtest is not the dict format as required")
150        return ""
151
152    for para in gtest_paras.keys():
153        if para.strip() == 'test-file-include-filter':
154            case_list = []
155            files = gtest_paras.get(para)
156            for case_file in files:
157                flags = os.O_RDONLY
158                modes = stat.S_IWUSR | stat.S_IRUSR
159                with os.fdopen(os.open(case_file, flags, modes),
160                               "r") as file_desc:
161                    case_list.extend(file_desc.read().splitlines())
162
163            runner.add_instrumentation_arg("gtest_filter", ":".join(case_list))
164
165        if para.strip() == 'all-test-file-exclude-filter':
166            json_file_list = gtest_paras.get("all-test-file-exclude-filter")
167            if json_file_list:
168                flags = os.O_RDONLY
169                modes = stat.S_IWUSR | stat.S_IRUSR
170                with os.fdopen(os.open(json_file_list[0], flags, modes),
171                               "r") as file_handler:
172                    json_data = json.load(file_handler)
173                exclude_list = json_data.get(DeviceTestType.cpp_test, [])
174                for exclude in exclude_list:
175                    if request.get_module_name() in exclude:
176                        case_list = exclude.get(request.get_module_name())
177                        runner.add_instrumentation_arg(
178                            "gtest_filter",
179                            "%s%s" % ("-", ":".join(case_list)))
180    return " ".join(ret_str)
181
182
183def reset_junit_para(junit_para_str, prefix_char="-e", ignore_keys=None):
184    if not ignore_keys and not isinstance(ignore_keys, list):
185        ignore_keys = ["class", "test"]
186    lines = junit_para_str.split("%s " % prefix_char)
187    normal_lines = []
188    for line in lines:
189        line = line.strip()
190        if line:
191            items = line.split()
192            if items[0].strip() in ignore_keys:
193                continue
194            normal_lines.append("{} {}".format(prefix_char, line))
195    return " ".join(normal_lines)
196
197
198def get_install_args(device, app_name, original_args=None):
199    """To obtain all the args of app install
200    Args:
201        original_args: the argus configure in .config file
202        device : the device will be installed app
203        app_name : the name of the app which will be installed
204    Returns:
205        All the args
206    """
207    if original_args is None:
208        original_args = []
209    new_args = original_args[:]
210    try:
211        sdk_version = device.get_property("ro.build.version.sdk")
212        if int(sdk_version) > TARGET_SDK_VERSION:
213            new_args.append("-g")
214    except TypeError as type_error:
215        LOG.error("Obtain the sdk version failed with exception {}".format(
216            type_error))
217    except ValueError as value_error:
218        LOG.error("Obtain the sdk version failed with exception {}".format(
219            value_error))
220    if app_name.endswith(".apex"):
221        new_args.append("--apex")
222    return " ".join(new_args)
223
224
225def get_app_name_by_tool(app_path, paths):
226    """To obtain the app name by using tool
227    Args:
228        app_path: the path of app
229        paths:
230    Returns:
231        The Pkg Name if found else None
232    """
233    rex = "^package:\\s+name='(.*?)'.*$"
234    aapt_tool_name = "aapt.exe" if os.name == "nt" else "aapt"
235    if app_path:
236        proc_timer = None
237        try:
238            tool_file = get_file_absolute_path(os.path.join(
239                "tools", aapt_tool_name), paths)
240            LOG.debug("Aapt file is %s" % tool_file)
241
242            if platform.system() == "Linux":
243                if not oct(os.stat(tool_file).st_mode)[-3:] == "755":
244                    os.chmod(tool_file, FilePermission.mode_755)
245
246            cmd = [tool_file, "dump", "badging", app_path]
247            timeout = 300
248            LOG.info("Execute command %s with %s" % (" ".join(cmd), timeout))
249
250            sub_process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
251                                           stderr=subprocess.PIPE)
252            proc_timer = Timer(timeout, timeout_callback, [sub_process])
253            proc_timer.start()
254            # The package name must be return in first line
255            output = sub_process.stdout.readline()
256            error = sub_process.stderr.readline()
257            LOG.debug("The output of aapt is {}".format(output))
258            if error:
259                LOG.debug("The error of aapt is {}".format(error))
260            if output:
261                pkg_match = re.match(rex, output.decode("utf8", 'ignore'))
262                if pkg_match is not None:
263                    LOG.info(
264                        "Obtain the app name {} successfully by using "
265                        "aapt".format(pkg_match.group(1)))
266                    return pkg_match.group(1)
267            return None
268        except (FileNotFoundError, ParamError) as error:
269            LOG.debug("Aapt error: %s", error.args)
270            return None
271        finally:
272            if proc_timer:
273                proc_timer.cancel()
274    else:
275        LOG.error("get_app_name_by_tool error.")
276        return None
277
278
279def timeout_callback(proc):
280    try:
281        LOG.error("Error: execute command timeout.")
282        LOG.error(proc.pid)
283        if platform.system() != "Windows":
284            os.killpg(proc.pid, signal.SIGKILL)
285        else:
286            subprocess.call(
287                ["C:\\Windows\\System32\\taskkill", "/F", "/T", "/PID",
288                 str(proc.pid)], shell=False)
289    except (FileNotFoundError, KeyboardInterrupt, AttributeError) as error:
290        LOG.exception("Timeout callback exception: %s" % error, exc_info=False)
291
292
293def disable_keyguard(device):
294    unlock_screen(device)
295    unlock_device(device)
296
297
298def unlock_screen(device):
299    device.execute_shell_command("svc power stayon true")
300    time.sleep(1)
301
302
303def unlock_device(device):
304    device.execute_shell_command("input keyevent 82")
305    time.sleep(1)
306    device.execute_shell_command("wm dismiss-keyguard")
307    time.sleep(1)
308