• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import re
21import stat
22import json
23import time
24import platform
25import subprocess
26import signal
27from threading import Timer
28
29from _core.constants import DeviceTestType
30from _core.constants import FilePermission
31from _core.constants import DeviceConnectorType
32from _core.error import ErrorMessage
33from _core.exception import ParamError
34from _core.logger import platform_logger
35from _core.utils import get_file_absolute_path
36
37LOG = platform_logger("Kit")
38
39TARGET_SDK_VERSION = 22
40
41__all__ = ["get_app_name_by_tool", "junit_para_parse", "gtest_para_parse",
42           "get_install_args", "reset_junit_para", "remount", "disable_keyguard",
43           "timeout_callback", "unlock_screen", "unlock_device", "get_class",
44           "check_device_ohca"]
45
46
47def remount(device):
48    cmd = "target mount" \
49        if device.usb_type == DeviceConnectorType.hdc else "remount"
50    device.connector_command(cmd)
51    device.execute_shell_command("remount")
52    device.execute_shell_command("mount -o rw,remount /")
53    device.execute_shell_command("mount -o rw,remount /sys_prod")
54    device.execute_shell_command("mount -o rw,remount /chip_prod")
55    device.execute_shell_command("mount -o rw,remount /preload")
56    device.execute_shell_command("mount -o rw,remount /patch_hw")
57    device.execute_shell_command("mount -o rw,remount /vendor")
58    device.execute_shell_command("mount -o rw,remount /cust")
59    device.execute_shell_command("mount -o rw,remount /product")
60    device.execute_shell_command("mount -o rw,remount /hw_product")
61    device.execute_shell_command("mount -o rw,remount /version")
62    device.execute_shell_command("mount -o rw,remount /system")
63    device.connector_command("target mount")
64
65
66def get_class(junit_paras, prefix_char, para_name):
67    if not junit_paras.get(para_name):
68        return ""
69
70    result = ""
71    if prefix_char == "-e":
72        result = " {} class ".format(prefix_char)
73    elif prefix_char == "--":
74        result = " {} class ".format(prefix_char)
75    elif prefix_char == "-s":
76        result = " {} class ".format(prefix_char)
77    test_items = []
78    for test in junit_paras.get(para_name):
79        test_item = test.split("#")
80        if len(test_item) == 1 or len(test_item) == 2:
81            test_item = "{}".format(test)
82            test_items.append(test_item)
83        elif len(test_item) == 3:
84            test_item = "{}#{}".format(test_item[1], test_item[2])
85            test_items.append(test_item)
86        else:
87            raise ParamError(ErrorMessage.Common.Code_0101031.format(prefix_char, para_name))
88    if not result:
89        LOG.debug("There is unsolved prefix char: {} .".format(prefix_char))
90    return result + ",".join(test_items)
91
92
93def junit_para_parse(device, junit_paras, prefix_char="-e"):
94    """To parse the para of junit
95    Args:
96        device: the device running
97        junit_paras: the para dict of junit
98        prefix_char: the prefix char of parsed cmd
99    Returns:
100        the new para using in a command like -e testFile xxx
101        -e coverage true...
102    """
103    ret_str = []
104    path = "/{}/{}/{}".format("data", "local", "ajur")
105    include_file = "{}/{}".format(path, "includes.txt")
106    exclude_file = "{}/{}".format(path, "excludes.txt")
107
108    if not isinstance(junit_paras, dict):
109        LOG.warning("The para of junit is not the dict format as required")
110        return ""
111    # Disable screen keyguard
112    disable_key_guard = junit_paras.get('disable-keyguard')
113    if not disable_key_guard or disable_key_guard[0].lower() != 'false':
114        disable_keyguard(device)
115
116    for key, value in junit_paras.items():
117        # value is a list object
118        para_name = key.strip()
119        path = "/{}/{}/{}/".format("data", "local", "ajur")
120        if para_name == "test-file-include-filter":
121            for file_name in value:
122                device.push_file(file_name, include_file)
123                device.execute_shell_command(
124                    'chown -R shell:shell {}'.format(path))
125            ret_str.append(" ".join([prefix_char, "testFile", include_file]))
126        elif para_name == "test-file-exclude-filter":
127            for file_name in value:
128                device.push_file(file_name, exclude_file)
129                device.execute_shell_command(
130                    'chown -R shell:shell {}'.format(path))
131            ret_str.append(" ".join([prefix_char, "notTestFile", exclude_file]))
132        elif para_name == "test" or para_name == "class":
133            result = get_class(junit_paras, prefix_char, para_name)
134            ret_str.append(result)
135        elif para_name == "include-annotation":
136            ret_str.append(" ".join([prefix_char, "annotation", ",".join(value)]))
137        elif para_name == "exclude-annotation":
138            ret_str.append(" ".join([prefix_char, "notAnnotation", ",".join(value)]))
139
140    return " ".join(ret_str)
141
142
143def get_include_tests(para_datas, test_types, runner):
144    case_list = []
145    if test_types == "class":
146        case_list = para_datas
147    else:
148        for case_file in para_datas:
149            flags = os.O_RDONLY
150            modes = stat.S_IWUSR | stat.S_IRUSR
151            with os.fdopen(os.open(case_file, flags, modes), "r") as file_desc:
152                case_list.extend(file_desc.read().splitlines())
153    runner.add_instrumentation_arg("gtest_filter", ":".join(case_list).replace("#", "."))
154
155
156def get_all_test_include(para_datas, test_types, runner, request):
157    case_list = []
158    if test_types == "notClass":
159        case_list = para_datas
160    else:
161        if para_datas:
162            flags = os.O_RDONLY
163            modes = stat.S_IWUSR | stat.S_IRUSR
164            with os.fdopen(os.open(para_datas[0], flags, modes), "r") as file_handler:
165                json_data = json.load(file_handler)
166            exclude_list = json_data.get(DeviceTestType.cpp_test, [])
167            for exclude in exclude_list:
168                if request.get_module_name() in exclude:
169                    temp = exclude.get(request.get_module_name())
170                    case_list.extend(temp)
171    runner.add_instrumentation_arg("gtest_filter", "{}{}".format("-", ":".join(case_list)).replace("#", "."))
172
173
174def gtest_para_parse(gtest_paras, runner, request):
175    """To parse the para of gtest
176    Args:
177        gtest_paras: the para dict of gtest
178    Returns:
179        the new para using in gtest
180    """
181    if not isinstance(gtest_paras, dict):
182        LOG.warning("The para of gtest is not the dict format as required")
183        return ""
184    for para in gtest_paras.keys():
185        test_types = para.strip()
186        para_datas = gtest_paras.get(para)
187        if test_types in ["test-file-include-filter", "class"]:
188            get_include_tests(para_datas, test_types, runner)
189        elif test_types in ["all-test-file-exclude-filter", "notClass"]:
190            get_all_test_include(para_datas, test_types, runner, request)
191    return ""
192
193
194def reset_junit_para(junit_para_str, prefix_char="-e", ignore_keys=None):
195    if not ignore_keys and not isinstance(ignore_keys, list):
196        ignore_keys = ["class", "test"]
197    lines = junit_para_str.split("{} ".format(prefix_char))
198    normal_lines = []
199    for line in lines:
200        line = line.strip()
201        if line:
202            items = line.split()
203            if items[0].strip() in ignore_keys:
204                continue
205            normal_lines.append("{} {}".format(prefix_char, line))
206    return " ".join(normal_lines)
207
208
209def get_install_args(device, app_name, original_args=None):
210    """To obtain all the args of app install
211    Args:
212        original_args: the argus configure in .config file
213        device : the device will be installed app
214        app_name : the name of the app which will be installed
215    Returns:
216        All the args
217    """
218    if original_args is None:
219        original_args = []
220    new_args = original_args[:]
221    try:
222        sdk_version = device.get_property("ro.build.version.sdk")
223        if int(sdk_version) > TARGET_SDK_VERSION:
224            new_args.append("-g")
225    except TypeError as type_error:
226        LOG.error("Obtain the sdk version failed with exception {}".format(
227            type_error))
228    except ValueError as value_error:
229        LOG.error("Obtain the sdk version failed with exception {}".format(
230            value_error))
231    if app_name.endswith(".apex"):
232        new_args.append("--apex")
233    return " ".join(new_args)
234
235
236def get_app_name_by_tool(app_path, paths):
237    """To obtain the app name by using tool
238    Args:
239        app_path: the path of app
240        paths:
241    Returns:
242        The Pkg Name if found else None
243    """
244    rex = "^package:\\s+name='(.*?)'.*$"
245    if platform.system() == "Windows":
246        aapt_tool_name = "aapt.exe"
247    elif platform.system() == "Linux":
248        aapt_tool_name = "aapt"
249    else:
250        aapt_tool_name = "aapt_mac"
251    if app_path:
252        proc_timer = None
253        try:
254            tool_file = get_file_absolute_path(aapt_tool_name, paths)
255            LOG.debug("Aapt file is {}".format(tool_file))
256
257            if platform.system() == "Linux" or platform.system() == "Darwin":
258                if not oct(os.stat(tool_file).st_mode)[-3:] == "755":
259                    os.chmod(tool_file, FilePermission.mode_755)
260
261            cmd = [tool_file, "dump", "badging", app_path]
262            timeout = 300
263            LOG.info("Execute command {} with {}".format(" ".join(cmd), timeout))
264
265            sub_process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
266                                           stderr=subprocess.PIPE)
267            proc_timer = Timer(timeout, timeout_callback, [sub_process])
268            proc_timer.start()
269            # The package name must be return in first line
270            output = sub_process.stdout.readline()
271            error = sub_process.stderr.readline()
272            LOG.debug("The output of aapt is {}".format(output))
273            if error:
274                LOG.debug("The error of aapt is {}".format(error))
275            if output:
276                pkg_match = re.match(rex, output.decode("utf8", 'ignore'))
277                if pkg_match is not None:
278                    LOG.info(
279                        "Obtain the app name {} successfully by using "
280                        "aapt".format(pkg_match.group(1)))
281                    return pkg_match.group(1)
282            return None
283        except (FileNotFoundError, ParamError) as error:
284            LOG.debug("Aapt error: {}".format(error.args))
285            return None
286        finally:
287            if proc_timer:
288                proc_timer.cancel()
289    else:
290        LOG.error("get_app_name_by_tool error.")
291        return None
292
293
294def timeout_callback(proc):
295    try:
296        LOG.error("Error: execute command timeout.")
297        LOG.error(proc.pid)
298        if platform.system() != "Windows":
299            os.killpg(proc.pid, signal.SIGKILL)
300        else:
301            subprocess.call(
302                ["C:\\Windows\\System32\\taskkill", "/F", "/T", "/PID",
303                 str(proc.pid)], shell=False)
304    except (FileNotFoundError, KeyboardInterrupt, AttributeError) as error:
305        LOG.exception("Timeout callback exception: {}".format(error, exc_info=False))
306
307
308def disable_keyguard(device):
309    unlock_screen(device)
310    unlock_device(device)
311
312
313def unlock_screen(device):
314    device.execute_shell_command("svc power stayon true")
315    time.sleep(1)
316
317
318def unlock_device(device):
319    device.execute_shell_command("input keyevent 82")
320    time.sleep(1)
321    device.execute_shell_command("wm dismiss-keyguard")
322    time.sleep(1)
323
324
325def check_device_ohca(device):
326    return False
327