• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import re
21import subprocess
22import zipfile
23import stat
24import time
25import json
26from dataclasses import dataclass
27from tempfile import TemporaryDirectory
28from tempfile import NamedTemporaryFile
29
30from xdevice import ITestKit
31from xdevice import platform_logger
32from xdevice import Plugin
33from xdevice import ParamError
34from xdevice import get_file_absolute_path
35from xdevice import get_config_value
36from xdevice import exec_cmd
37from xdevice import ConfigConst
38from xdevice import AppInstallError
39from xdevice import convert_serial
40from xdevice import check_path_legal
41from xdevice import modify_props
42from xdevice import get_app_name_by_tool
43from xdevice import remount
44from xdevice import disable_keyguard
45from xdevice import get_class
46
47from ohos.constants import CKit
48from ohos.environment.dmlib import HdcHelper
49from ohos.environment.dmlib import CollectingOutputReceiver
50
51__all__ = ["STSKit", "CommandKit", "PushKit", "PropertyCheckKit", "ShellKit", "WifiKit",
52           "ConfigKit", "AppInstallKit", "ComponentKit", "PermissionKit",
53           "junit_dex_para_parse", "oh_jsunit_para_parse"]
54
55MAX_WAIT_COUNT = 4
56TARGET_SDK_VERSION = 22
57
58LOG = platform_logger("Kit")
59
60
61@Plugin(type=Plugin.TEST_KIT, id=CKit.command)
62class CommandKit(ITestKit):
63
64    def __init__(self):
65        self.run_command = []
66        self.teardown_command = []
67        self.paths = ""
68
69    def __check_config__(self, config):
70        self.paths = get_config_value('paths', config)
71        self.teardown_command = get_config_value('teardown', config)
72        self.run_command = get_config_value('shell', config)
73
74    def __setup__(self, device, **kwargs):
75        del kwargs
76        LOG.debug("CommandKit setup, device:{}, params:{}".
77                  format(device, self.get_plugin_config().__dict__))
78        if len(self.run_command) == 0:
79            LOG.info("No setup_command to run, skipping!")
80            return
81        for command in self.run_command:
82            self._run_command(command, device)
83
84    def __teardown__(self, device):
85        LOG.debug("CommandKit teardown: device:{}, params:{}".format
86                  (device, self.get_plugin_config().__dict__))
87        if len(self.teardown_command) == 0:
88            LOG.info("No teardown_command to run, skipping!")
89            return
90        for command in self.teardown_command:
91            self._run_command(command, device)
92
93    def _run_command(self, command, device):
94
95        command_type = command.get("name").strip()
96        command_value = command.get("value")
97
98        if command_type == "reboot":
99            device.reboot()
100        elif command_type == "install":
101            LOG.debug("Trying to install package {}".format(command_value))
102            package = get_file_absolute_path(command_value, self.paths)
103            if not package or not os.path.exists(package):
104                LOG.error(
105                    "The package {} to be installed does not exist".format(
106                        package))
107
108            result = device.install_package(package)
109            if not result.startswith("Success"):
110                raise AppInstallError(
111                    "Failed to install %s on %s. Reason:%s" %
112                    (package, device.__get_serial__(), result))
113            LOG.debug("Installed package finished {}".format(package))
114        elif command_type == "uninstall":
115            LOG.debug("Trying to uninstall package {}".format(command_value))
116            package = get_file_absolute_path(command_value, self.paths)
117            app_name = get_app_name_by_tool(package, self.paths)
118            if app_name:
119                result = device.uninstall_package(app_name)
120                if not result.startswith("Success"):
121                    LOG.error("error uninstalling package %s %s" %
122                              (device.__get_serial__(), result))
123            LOG.debug("uninstall package finished {}".format(app_name))
124        elif command_type == "pull":
125            files = command_value.split("->")
126            remote = files[0].strip()
127            local = files[1].strip()
128            device.pull_file(remote, local)
129        elif command_type == "push":
130            files = command_value.split("->")
131            src = files[0].strip()
132            dst = files[1].strip() if files[1].strip().startswith("/") else \
133                files[1].strip() + Props.dest_root
134            LOG.debug(
135                "Trying to push the file local {} to remote{}".format(
136                    src, dst))
137            real_src_path = get_file_absolute_path(src, self.paths)
138            if not real_src_path or not os.path.exists(real_src_path):
139                LOG.error(
140                    "The src file {} to be pushed does not exist".format(src))
141            device.push_file(real_src_path, dst)
142            LOG.debug("Push file finished from {} to {}".format(src, dst))
143        elif command_type == "shell":
144            device.execute_shell_command(command_value)
145
146
147@Plugin(type=Plugin.TEST_KIT, id=CKit.sts)
148class STSKit(ITestKit):
149    def __init__(self):
150        self.sts_version = ""
151        self.throw_error = ""
152
153    def __check_config__(self, config):
154        self.sts_version = get_config_value('sts-version', config)
155        self.throw_error = get_config_value('throw-error', config)
156        if len(self.sts_version) < 1:
157            raise TypeError(
158                "The sts_version: {} is invalid".format(self.sts_version))
159
160    def __setup__(self, device, **kwargs):
161        del kwargs
162        LOG.debug("STSKit setup, device:{}, params:{}".
163                  format(device, self.get_plugin_config().__dict__))
164        device_spl = device.get_property(Props.security_patch)
165        if device_spl is None or device_spl == "":
166            LOG.error("The device security {} is invalid".format(device_spl))
167            raise ParamError(
168                "The device security patch version {} is invalid".format(
169                    device_spl))
170        rex = '^[a-zA-Z\\d\\.]+_([\\d]+-[\\d]+)$'
171        match = re.match(rex, self.sts_version)
172        if match is None:
173            LOG.error("The sts version {} does match the rule".format(
174                self.sts_version))
175            raise ParamError("The sts version {} does match the rule".format(
176                self.sts_version))
177        sts_version_date_user = match.group(1).join("-01")
178        sts_version_date_kernel = match.group(1).join("-05")
179        if device_spl in [sts_version_date_user, sts_version_date_kernel]:
180            LOG.info(
181                "The device SPL version {} match the sts version {}".format(
182                    device_spl, self.sts_version))
183        else:
184            err_msg = "The device SPL version {} does not match the sts " \
185                      "version {}".format(device_spl, self.sts_version)
186            LOG.error(err_msg)
187            raise ParamError(err_msg)
188
189    def __teardown__(self, device):
190        LOG.debug("STSKit teardown: device:{}, params:{}".format
191                  (device, self.get_plugin_config().__dict__))
192
193
194@Plugin(type=Plugin.TEST_KIT, id=CKit.push)
195class PushKit(ITestKit):
196    def __init__(self):
197        self.pre_push = ""
198        self.push_list = ""
199        self.post_push = ""
200        self.is_uninstall = ""
201        self.paths = ""
202        self.pushed_file = []
203        self.abort_on_push_failure = True
204        self.teardown_push = ""
205
206    def __check_config__(self, config):
207        self.pre_push = get_config_value('pre-push', config)
208        self.push_list = get_config_value('push', config)
209        self.post_push = get_config_value('post-push', config)
210        self.teardown_push = get_config_value('teardown-push', config)
211        self.is_uninstall = get_config_value('uninstall', config,
212                                             is_list=False, default=True)
213        self.abort_on_push_failure = get_config_value(
214            'abort-on-push-failure', config, is_list=False, default=True)
215        if isinstance(self.abort_on_push_failure, str):
216            self.abort_on_push_failure = False if \
217                self.abort_on_push_failure.lower() == "false" else True
218
219        self.paths = get_config_value('paths', config)
220        self.pushed_file = []
221
222    def __setup__(self, device, **kwargs):
223        del kwargs
224        LOG.debug("PushKit setup, device:{}".format(device.device_sn))
225        for command in self.pre_push:
226            run_command(device, command)
227        dst = None
228        remount(device)
229        for push_info in self.push_list:
230            files = re.split('->|=>', push_info)
231            if len(files) != 2:
232                LOG.error("The push spec is invalid: {}".format(push_info))
233                continue
234            src = files[0].strip()
235            dst = files[1].strip() if files[1].strip().startswith("/") else \
236                files[1].strip() + Props.dest_root
237            LOG.debug(
238                "Trying to push the file local {} to remote {}".format(src,
239                                                                       dst))
240
241            try:
242                real_src_path = get_file_absolute_path(src, self.paths)
243            except ParamError as error:
244                if self.abort_on_push_failure:
245                    raise error
246                else:
247                    LOG.warning(error, error_no=error.error_no)
248                    continue
249            # hdc don't support push directory now
250            if os.path.isdir(real_src_path):
251                device.connector_command("shell mkdir {}".format(dst))
252                for root, _, files in os.walk(real_src_path):
253                    for file in files:
254                        device.push_file("{}".format(os.path.join(root, file)),
255                                         "{}".format(dst))
256                        LOG.debug(
257                            "Push file finished from {} to {}".format(
258                                os.path.join(root, file), dst))
259                        self.pushed_file.append(os.path.join(dst, file))
260            else:
261                if device.is_directory(dst):
262                    dst = os.path.join(dst, os.path.basename(real_src_path))
263                    if dst.find("\\") > -1:
264                        dst_paths = dst.split("\\")
265                        dst = "/".join(dst_paths)
266                device.push_file("{}".format(real_src_path),
267                                 "{}".format(dst))
268                LOG.debug("Push file finished from {} to {}".format(src, dst))
269                self.pushed_file.append(dst)
270        for command in self.post_push:
271            run_command(device, command)
272        return self.pushed_file, dst
273
274    def add_pushed_dir(self, src, dst):
275        for root, _, files in os.walk(src):
276            for file_path in files:
277                self.pushed_file.append(
278                    os.path.join(root, file_path).replace(src, dst))
279
280    def __teardown__(self, device):
281        LOG.debug("PushKit teardown: device:{}".format(device.device_sn))
282        for command in self.teardown_push:
283            run_command(device, command)
284        if self.is_uninstall:
285            remount(device)
286            for file_name in self.pushed_file:
287                LOG.debug("Trying to remove file {}".format(file_name))
288                file_name = file_name.replace("\\", "/")
289
290                for _ in range(
291                        Props.trying_remove_maximum_times):
292                    collect_receiver = CollectingOutputReceiver()
293                    file_name = check_path_legal(file_name)
294                    device.execute_shell_command("rm -rf {}".format(
295                        file_name), receiver=collect_receiver,
296                        output_flag=False)
297                    if not collect_receiver.output:
298                        LOG.debug(
299                            "Removed file {} successfully".format(file_name))
300                        break
301                    else:
302                        LOG.error("Removed file {} successfully".
303                                  format(collect_receiver.output))
304                else:
305                    LOG.error("Failed to remove file {}".format(file_name))
306
307    def __add_pushed_file__(self, device, src, dst):
308        if device.is_directory(dst):
309            dst = dst + os.path.basename(src) if dst.endswith(
310                "/") else dst + "/" + os.path.basename(src)
311        self.pushed_file.append(dst)
312
313    def __add_dir_pushed_files__(self, device, src, dst):
314        if device.file_exist(device, dst):
315            for _, dirs, files in os.walk(src):
316                for file_path in files:
317                    if dst.endswith("/"):
318                        dst = "%s%s" % (dst, os.path.basename(file_path))
319                    else:
320                        dst = "%s/%s" % (dst, os.path.basename(file_path))
321                    self.pushed_file.append(dst)
322                for dir_name in dirs:
323                    self.__add_dir_pushed_files__(device, dir_name, dst)
324        else:
325            self.pushed_file.append(dst)
326
327
328@Plugin(type=Plugin.TEST_KIT, id=CKit.propertycheck)
329class PropertyCheckKit(ITestKit):
330    def __init__(self):
331        self.prop_name = ""
332        self.expected_value = ""
333        self.throw_error = ""
334
335    def __check_config__(self, config):
336        self.prop_name = get_config_value('property-name', config,
337                                          is_list=False)
338        self.expected_value = get_config_value('expected-value', config,
339                                               is_list=False)
340        self.throw_error = get_config_value('throw-error', config,
341                                            is_list=False)
342
343    def __setup__(self, device, **kwargs):
344        del kwargs
345        LOG.debug("PropertyCheckKit setup, device:{}".format(device.device_sn))
346        if not self.prop_name:
347            LOG.warning("The option of property-name not setting")
348            return
349        prop_value = device.get_property(self.prop_name)
350        if not prop_value:
351            LOG.warning(
352                "The property {} not found on device, cannot check the value".
353                    format(self.prop_name))
354            return
355
356        if prop_value != self.expected_value:
357            msg = "The value found for property {} is {}, not same with the " \
358                  "expected {}".format(self.prop_name, prop_value,
359                                       self.expected_value)
360            LOG.warning(msg)
361            if self.throw_error and self.throw_error.lower() == 'true':
362                raise Exception(msg)
363
364    @classmethod
365    def __teardown__(cls, device):
366        LOG.debug("PropertyCheckKit teardown: device:{}".format(
367            device.device_sn))
368
369
370@Plugin(type=Plugin.TEST_KIT, id=CKit.shell)
371class ShellKit(ITestKit):
372    def __init__(self):
373        self.command_list = []
374        self.tear_down_command = []
375        self.paths = None
376
377    def __check_config__(self, config):
378        self.command_list = get_config_value('run-command', config)
379        self.tear_down_command = get_config_value('teardown-command', config)
380        self.tear_down_local_command = get_config_value('teardown-localcommand', config)
381        self.paths = get_config_value('paths', config)
382
383    def __setup__(self, device, **kwargs):
384        del kwargs
385        LOG.debug("ShellKit setup, device:{}".format(device.device_sn))
386        if len(self.command_list) == 0:
387            LOG.info("No setup_command to run, skipping!")
388            return
389        for command in self.command_list:
390            run_command(device, command)
391
392    def __teardown__(self, device):
393        LOG.debug("ShellKit teardown: device:{}".format(device.device_sn))
394        if len(self.tear_down_command) == 0:
395            LOG.info("No teardown_command to run, skipping!")
396        else:
397            for command in self.tear_down_command:
398                run_command(device, command)
399        if len(self.tear_down_local_command) == 0:
400            LOG.info("No teardown-localcommand to run, skipping!")
401        else:
402            for command in self.tear_down_local_command:
403                subprocess.run(command)
404
405
406@Plugin(type=Plugin.TEST_KIT, id=CKit.wifi)
407class WifiKit(ITestKit):
408    def __init__(self):
409        self.certfilename = ""
410        self.certpassword = ""
411        self.wifiname = ""
412        self.paths = ""
413
414    def __check_config__(self, config):
415        self.certfilename = get_config_value(
416            'certfilename', config, False,
417            default=None)
418        self.certpassword = get_config_value(
419            'certpassword', config, False,
420            default=None)
421        self.wifiname = get_config_value(
422            'wifiname', config, False,
423            default=None)
424        self.paths = get_config_value('paths', config)
425
426    def __setup__(self, device, **kwargs):
427        request = kwargs.get("request", None)
428        if not request:
429            LOG.error("WifiKit need input request")
430            return
431        testargs = request.get("testargs", {})
432        self.certfilename = \
433            testargs.pop("certfilename", [self.certfilename])[0]
434        self.wifiname = \
435            testargs.pop("wifiname", [self.wifiname])[0]
436        self.certpassword = \
437            testargs.pop("certpassword", [self.certpassword])[0]
438        del kwargs
439        LOG.debug("WifiKit setup, device:{}".format(device.device_sn))
440
441        try:
442            wifi_app_path = get_file_absolute_path(
443                Props.Paths.service_wifi_app_path, self.paths)
444        except ParamError as _:
445            wifi_app_path = None
446
447        if wifi_app_path is None:
448            LOG.error("The resource wifi app file does not exist!")
449            return
450
451        try:
452            pfx_path = get_file_absolute_path(
453                "tools/wifi/%s" % self.certfilename
454            ) if self.certfilename else None
455        except ParamError as _:
456            pfx_path = None
457
458        if pfx_path is None:
459            LOG.error("The resource wifi pfx file does not exist!")
460            return
461        pfx_dest_path = \
462            "/storage/emulated/0/%s" % self.certfilename
463        if self.wifiname is None:
464            LOG.error("The wifi name is not given!")
465            return
466        if self.certpassword is None:
467            LOG.error("The wifi password is not given!")
468            return
469
470        device.install_package(wifi_app_path, command="-r")
471        device.push_file(pfx_path, pfx_dest_path)
472        device.execute_shell_command("svc wifi enable")
473        for _ in range(Props.maximum_connect_wifi_times):
474            connect_wifi_cmd = Props.connect_wifi_cmd % (
475                pfx_dest_path,
476                self.certpassword,
477                self.wifiname
478            )
479            if device.execute_shell_command(connect_wifi_cmd):
480                LOG.info("Connect wifi successfully")
481                break
482        else:
483            LOG.error("Connect wifi failed")
484
485    @classmethod
486    def __teardown__(cls, device):
487        LOG.debug("WifiKit teardown: device:{}".format(device.device_sn))
488        LOG.info("Disconnect wifi")
489        device.execute_shell_command("svc wifi disable")
490
491
492@dataclass
493class Props:
494    @dataclass
495    class Paths:
496        system_build_prop_path = "/%s/%s" % ("system", "build.prop")
497        service_wifi_app_path = "tools/wifi/%s" % "Service-wifi.app"
498
499    dest_root = "/%s/%s/" % ("data", "data")
500    mnt_external_storage = "EXTERNAL_STORAGE"
501    trying_remove_maximum_times = 3
502    maximum_connect_wifi_times = 3
503    connect_wifi_cmd = "am instrument -e request \"{module:Wifi, " \
504                       "method:connectWifiByCertificate, params:{'certPath':" \
505                       "'%s'," \
506                       "'certPassword':'%s'," \
507                       "'wifiName':'%s'}}\"  " \
508                       "-w com.xdeviceservice.service/.MainInstrumentation"
509    security_patch = "ro.build.version.security_patch"
510
511
512@Plugin(type=Plugin.TEST_KIT, id=CKit.config)
513class ConfigKit(ITestKit):
514    def __init__(self):
515        self.is_connect_wifi = ""
516        self.is_disconnect_wifi = ""
517        self.wifi_kit = WifiKit()
518        self.min_external_store_space = ""
519        self.is_disable_dialing = ""
520        self.is_test_harness = ""
521        self.is_audio_silent = ""
522        self.is_disable_dalvik_verifier = ""
523        self.build_prop_list = ""
524        self.is_enable_hook = ""
525        self.cust_prop_file = ""
526        self.is_prop_changed = False
527        self.local_system_prop_file = ""
528        self.cust_props = ""
529        self.is_reboot_delay = ""
530        self.is_remount = ""
531        self.local_cust_prop_file = {}
532
533    def __check_config__(self, config):
534        self.is_connect_wifi = get_config_value('connect-wifi', config,
535                                                is_list=False, default=False)
536        self.is_disconnect_wifi = get_config_value(
537            'disconnect-wifi-after-test', config, is_list=False, default=True)
538        self.wifi_kit = WifiKit()
539        self.min_external_store_space = get_config_value(
540            'min-external-store-space', config)
541        self.is_disable_dialing = get_config_value('disable-dialing', config)
542        self.is_test_harness = get_config_value('set-test-harness', config)
543        self.is_audio_silent = get_config_value('audio-silent', config)
544        self.is_disable_dalvik_verifier = get_config_value(
545            'disable-dalvik-verifier', config)
546        self.build_prop_list = get_config_value('build-prop', config)
547        self.cust_prop_file = get_config_value('cust-prop-file', config)
548        self.cust_props = get_config_value('cust-prop', config)
549        self.is_enable_hook = get_config_value('enable-hook', config)
550        self.is_reboot_delay = get_config_value('reboot-delay', config)
551        self.is_remount = get_config_value('remount', config, default=True)
552        self.local_system_prop_file = NamedTemporaryFile(prefix='build',
553                                                         suffix='.prop',
554                                                         delete=False).name
555
556    def __setup__(self, device, **kwargs):
557        del kwargs
558        LOG.debug("ConfigKit setup, device:{}".format(device.device_sn))
559        if self.is_remount:
560            remount(device)
561        self.is_prop_changed = self.modify_system_prop(device)
562        self.is_prop_changed = self.modify_cust_prop(
563            device) or self.is_prop_changed
564
565        keep_screen_on(device)
566        if self.is_enable_hook:
567            pass
568        if self.is_prop_changed:
569            device.reboot()
570
571    def __teardown__(self, device):
572        LOG.debug("ConfigKit teardown: device:{}".format(device.device_sn))
573        if self.is_remount:
574            remount(device)
575        if self.is_connect_wifi and self.is_disconnect_wifi:
576            self.wifi_kit.__teardown__(device)
577        if self.is_prop_changed:
578            device.push_file(self.local_system_prop_file,
579                             Props.Paths.system_build_prop_path)
580            device.execute_shell_command(
581                " ".join(["chmod 644", Props.Paths.system_build_prop_path]))
582            os.remove(self.local_system_prop_file)
583
584            for target_file, temp_file in self.local_cust_prop_file.items():
585                device.push_file(temp_file, target_file)
586                device.execute_shell_command(
587                    " ".join(["chmod 644", target_file]))
588                os.remove(temp_file)
589
590    def modify_system_prop(self, device):
591        prop_changed = False
592        new_props = {}
593        if self.is_disable_dialing:
594            new_props['ro.telephony.disable-call'] = 'true'
595        if self.is_test_harness:
596            new_props['ro.monkey'] = '1'
597            new_props['ro.test_harness'] = '1'
598        if self.is_audio_silent:
599            new_props['ro.audio.silent'] = '1'
600        if self.is_disable_dalvik_verifier:
601            new_props['dalvik.vm.dexopt-flags'] = 'v=n'
602        for prop in self.build_prop_list:
603            if prop is None or prop.find("=") < 0 or len(prop.split("=")) != 2:
604                LOG.warning("The build prop:{} not match the format "
605                            "'key=value'".format(prop))
606                continue
607            new_props[prop.split("=")[0]] = prop.split("=")[1]
608        if new_props:
609            prop_changed = modify_props(device, self.local_system_prop_file,
610                                        Props.Paths.system_build_prop_path,
611                                        new_props)
612        return prop_changed
613
614    def modify_cust_prop(self, device):
615        prop_changed = False
616        cust_files = {}
617        new_props = {}
618        for cust_prop_file in self.cust_prop_file:
619            # the correct format should be "CustName:/cust/prop/absolutepath"
620            if len(cust_prop_file.split(":")) != 2:
621                LOG.error(
622                    "The value %s of option cust-prop-file is incorrect" %
623                    cust_prop_file)
624                continue
625            cust_files[cust_prop_file.split(":")[0]] = \
626                cust_prop_file.split(":")[1]
627        for prop in self.cust_props:
628            # the correct format should be "CustName:key=value"
629            prop_infos = re.split(r'[:|=]', prop)
630            if len(prop_infos) != 3:
631                LOG.error(
632                    "The value {} of option cust-prop is incorrect".format(
633                        prop))
634                continue
635            file_name, key, value = prop_infos
636            if file_name not in cust_files:
637                LOG.error(
638                    "The custName {} must be in cust-prop-file option".format(
639                        file_name))
640                continue
641            props = new_props.setdefault(file_name, {})
642            props[key] = value
643
644        for name in new_props.keys():
645            cust_file = cust_files.get(name)
646            temp_cust_file = NamedTemporaryFile(prefix='cust', suffix='.prop',
647                                                delete=False).name
648            self.local_cust_prop_file[cust_file] = temp_cust_file
649            try:
650                prop_changed = modify_props(device, temp_cust_file, cust_file,
651                                            new_props[name]) or prop_changed
652            except KeyError:
653                LOG.error("Get props error.")
654                continue
655
656        return prop_changed
657
658
659@Plugin(type=Plugin.TEST_KIT, id=CKit.app_install)
660class AppInstallKit(ITestKit):
661    def __init__(self):
662        self.app_list = ""
663        self.app_list_name = ""
664        self.is_clean = ""
665        self.alt_dir = ""
666        self.ex_args = ""
667        self.installed_app = set()
668        self.paths = ""
669        self.is_pri_app = ""
670        self.pushed_hap_file = set()
671        self.env_index_list = None
672
673    def __check_config__(self, options):
674        self.app_list = get_config_value('test-file-name', options)
675        self.app_list_name = get_config_value('test-file-packName', options)
676        self.is_clean = get_config_value('cleanup-apps', options, False)
677        self.alt_dir = get_config_value('alt-dir', options, False)
678        if self.alt_dir and self.alt_dir.startswith("resource/"):
679            self.alt_dir = self.alt_dir[len("resource/"):]
680        self.ex_args = get_config_value('install-arg', options, False)
681        self.installed_app = set()
682        self.paths = get_config_value('paths', options)
683        self.is_pri_app = get_config_value('install-as-privapp', options,
684                                           False, default=False)
685        self.env_index_list = get_config_value('env-index', options)
686
687    def __setup__(self, device, **kwargs):
688        del kwargs
689        LOG.debug("AppInstallKit setup, device:{}".format(device.device_sn))
690        if len(self.app_list) == 0:
691            LOG.info("No app to install, skipping!")
692            return
693        # to disable app install alert
694        device.execute_shell_command("setprop persist.sys.platformautotest 1")
695        for app in self.app_list:
696            if self.alt_dir:
697                app_file = get_file_absolute_path(app, self.paths,
698                                                  self.alt_dir)
699            else:
700                app_file = get_file_absolute_path(app, self.paths)
701            if app_file is None:
702                LOG.error("The app file {} does not exist".format(app))
703                continue
704            if hasattr(device, "is_harmony") and device.is_harmony:
705                device.connector_command("install \"{}\"".format(app_file))
706            else:
707                self.install_hap(device, app_file)
708            self.installed_app.add(app_file)
709
710    def __teardown__(self, device):
711        LOG.debug("AppInstallKit teardown: device:{}".format(device.device_sn))
712        if self.is_clean and str(self.is_clean).lower() == "true":
713            if self.app_list_name and len(self.app_list_name) > 0:
714                for app_name in self.app_list_name:
715                    result = device.uninstall_package(app_name)
716                    if result and (result.startswith("Success") or "successfully" in result):
717                        LOG.debug("uninstalling package Success. result is %s" %
718                                  result)
719                    else:
720                        LOG.warning("Error uninstalling package %s %s" %
721                                    (device.__get_serial__(), result))
722            else:
723                for app in self.installed_app:
724                    app_name = get_app_name(app)
725                    if app_name:
726                        result = device.uninstall_package(app_name)
727                        if result and (result.startswith("Success") or "successfully" in result):
728                            LOG.debug("uninstalling package Success. result is %s" %
729                                      result)
730                        else:
731                            LOG.warning("Error uninstalling package %s %s" %
732                                        (device.__get_serial__(), result))
733                    else:
734                        LOG.warning("Can't find app name for %s" % app)
735        if self.is_pri_app:
736            remount(device)
737        for pushed_file in self.pushed_hap_file:
738            device.execute_shell_command("rm -r %s" % pushed_file)
739
740    def install_hap(self, device, hap_file):
741        if self.is_pri_app:
742            LOG.info("Install hap as privileged app {}".format(hap_file))
743            hap_name = os.path.basename(hap_file).replace(".hap", "")
744            try:
745                with TemporaryDirectory(prefix=hap_name) as temp_dir:
746                    zif_file = zipfile.ZipFile(hap_file)
747                    zif_file.extractall(path=temp_dir)
748                    entry_app = os.path.join(temp_dir, "Entry.app")
749                    push_dest_dir = os.path.join("/system/priv-app/", hap_name)
750                    device.execute_shell_command("rm -rf " + push_dest_dir,
751                                                 output_flag=False)
752                    device.push_file(entry_app, os.path.join(
753                        push_dest_dir + os.path.basename(entry_app)))
754                    device.push_file(hap_file, os.path.join(
755                        push_dest_dir + os.path.basename(hap_file)))
756                    self.pushed_hap_file.add(os.path.join(
757                        push_dest_dir + os.path.basename(hap_file)))
758                    device.reboot()
759            except RuntimeError as exception:
760                msg = "Install hap app failed withe error {}".format(exception)
761                LOG.error(msg)
762                raise Exception(msg)
763            except Exception as exception:
764                msg = "Install hap app failed withe exception {}".format(
765                    exception)
766                LOG.error(msg)
767                raise Exception(msg)
768            finally:
769                zif_file.close()
770        else:
771            push_dest = "/%s" % "sdcard"
772            push_dest = "%s/%s" % (push_dest, os.path.basename(hap_file))
773            device.push_file(hap_file, push_dest)
774            self.pushed_hap_file.add(push_dest)
775            output = device.execute_shell_command("bm install -p " + push_dest)
776            if not output.startswith("Success") and not "successfully" in output:
777                output = output.strip()
778                if "[ERROR_GET_BUNDLE_INSTALLER_FAILED]" not in output.upper():
779                    raise AppInstallError(
780                        "Failed to install %s on %s. Reason:%s" %
781                        (push_dest, device.__get_serial__(), output))
782                else:
783                    LOG.info("'[ERROR_GET_BUNDLE_INSTALLER_FAILED]' occurs, "
784                             "retry install hap")
785                    exec_out = self.retry_install_hap(
786                        device, "bm install -p " + push_dest)
787                    if not exec_out.startswith("Success") and not "successfully" in output:
788                        raise AppInstallError(
789                            "Retry failed,Can't install %s on %s. Reason:%s" %
790                            (push_dest, device.__get_serial__(), exec_out))
791            else:
792                LOG.debug("Install %s success" % push_dest)
793
794    @classmethod
795    def retry_install_hap(cls, device, command):
796        real_command = [HdcHelper.CONNECTOR_NAME, "-t", str(device.device_sn), "-s",
797                        "tcp:%s:%s" % (str(device.host), str(device.port)),
798                        "shell", command]
799        message = "%s execute command: %s" % \
800                  (convert_serial(device.device_sn), " ".join(real_command))
801        LOG.info(message)
802        exec_out = ""
803        for wait_count in range(1, MAX_WAIT_COUNT):
804            LOG.debug("Retry times:%s, wait %ss" %
805                      (wait_count, (wait_count * 10)))
806            time.sleep(wait_count * 10)
807            exec_out = exec_cmd(real_command)
808            if exec_out and exec_out.startswith("Success"):
809                break
810        if not exec_out:
811            exec_out = "System is not in %s" % ["Windows", "Linux", "Darwin"]
812        LOG.info("Retry install hap result is: [%s]" % exec_out.strip())
813        return exec_out
814
815
816@Plugin(type=Plugin.TEST_KIT, id=CKit.component)
817class ComponentKit(ITestKit):
818
819    def __init__(self):
820        self._white_list_file = ""
821        self._white_list = ""
822        self._cap_file = ""
823        self.paths = ""
824        self.cache_subsystem = set()
825        self.cache_part = set()
826
827    def __check_config__(self, config):
828        self._white_list_file =\
829            get_config_value('white-list', config, is_list=False)
830        self._cap_file = get_config_value('cap-file', config, is_list=False)
831        self.paths = get_config_value('paths', config)
832
833    def __setup__(self, device, **kwargs):
834        if hasattr(device, ConfigConst.support_component):
835            return
836        if device.label in ["phone", "watch", "car", "tv", "tablet", "ivi"]:
837            command = "cat %s" % self._cap_file
838            result = device.execute_shell_command(command)
839            part_set = set()
840            subsystem_set = set()
841            if "{" in result:
842                for item in json.loads(result).get("components", []):
843                    part_set.add(item.get("component", ""))
844            subsystems, parts = self.get_white_list()
845            part_set.update(parts)
846            subsystem_set.update(subsystems)
847            setattr(device, ConfigConst.support_component,
848                    (subsystem_set, part_set))
849            self.cache_subsystem.update(subsystem_set)
850            self.cache_part.update(part_set)
851
852    def get_cache(self):
853        return self.cache_subsystem, self.cache_part
854
855    def get_white_list(self):
856        if not self._white_list and self._white_list_file:
857            self._white_list = self._parse_white_list()
858        return self._white_list
859
860    def _parse_white_list(self):
861        subsystem = set()
862        part = set()
863        white_json_file = os.path.normpath(self._white_list_file)
864        if not os.path.isabs(white_json_file):
865            white_json_file = \
866                get_file_absolute_path(white_json_file, self.paths)
867        if os.path.isfile(white_json_file):
868            subsystem_list = list()
869            flags = os.O_RDONLY
870            modes = stat.S_IWUSR | stat.S_IRUSR
871            with os.fdopen(os.open(white_json_file, flags, modes),
872                           "r") as file_content:
873                json_result = json.load(file_content)
874                if "subsystems" in json_result.keys():
875                    subsystem_list.extend(json_result["subsystems"])
876                for subsystem_item_list in subsystem_list:
877                    for key, value in subsystem_item_list.items():
878                        if key == "subsystem":
879                            subsystem.add(value)
880                        elif key == "components":
881                            for component_item in value:
882                                if "component" in component_item.keys():
883                                    part.add(
884                                        component_item["component"])
885
886        return subsystem, part
887
888    def __teardown__(self, device):
889        if hasattr(device, ConfigConst.support_component):
890            setattr(device, ConfigConst.support_component, None)
891        self._white_list_file = ""
892        self._white_list = ""
893        self._cap_file = ""
894        self.cache_subsystem.clear()
895        self.cache_part.clear()
896        self.cache_device.clear()
897
898
899@Plugin(type=Plugin.TEST_KIT , id=CKit.permission)
900class PermissionKit(ITestKit):
901    def __init__(self):
902        self.package_name_list = None
903        self.permission_list = None
904
905    def __check_config__(self, config):
906        self.package_name_list = \
907            get_config_value('package-names', config, True, [])
908        self.permission_list = \
909            get_config_value('permissions', config, True, [])
910
911    def __setup__(self, device, **kwargs):
912        if not self.package_name_list or not self.permission_list:
913            LOG.warning("Please check parameters of permission kit in json")
914            return
915        for index in range(len(self.package_name_list)):
916            cur_name = self.package_name_list[index]
917            token_id = self._get_token_id(device, cur_name)
918            if not token_id:
919                LOG.warning("Not found accessTokenId of '{}'".format(cur_name))
920                continue
921            for permission in self.permission_list[index]:
922                command = "atm perm -g -i {} -p {}".format(token_id,
923                                                           permission)
924                out = device.execute_shell_command(command)
925                LOG.debug("Set permission result: {}".format(out))
926
927    def __teardown__(self, device):
928        pass
929
930    def _get_token_id(self, device, pkg_name):
931        # shell bm dump -n
932        dump_command = "bm dump -n {}".format(pkg_name)
933        content = device.execute_shell_command(dump_command)
934        if not content or not str(content).startswith(pkg_name):
935            return ""
936        content = content[len(pkg_name) + len(":\n"):]
937        dump_dict = json.loads(content)
938        if "userInfo" not in dump_dict.keys():
939            return ""
940        user_info_dict = dump_dict["userInfo"][0]
941        if "accessTokenId" not in user_info_dict.keys():
942            return ""
943        else:
944            return user_info_dict["accessTokenId"]
945
946
947def keep_screen_on(device):
948    device.execute_shell_command("svc power stayon true")
949
950
951def run_command(device, command):
952    LOG.debug("The command:{} is running".format(command))
953    stdout = None
954    if command.strip() == "remount":
955        remount(device)
956    elif command.strip() == "reboot":
957        device.reboot()
958    elif command.strip() == "reboot-delay":
959        pass
960    elif command.strip().endswith("&"):
961        device.execute_shell_in_daemon(command.strip())
962    else:
963        stdout = device.execute_shell_command(command)
964    LOG.debug("Run command result: %s" % (stdout if stdout else ""))
965    return stdout
966
967
968def junit_dex_para_parse(device, junit_paras, prefix_char="--"):
969    """To parse the para of junit
970    Args:
971        device: the device running
972        junit_paras: the para dict of junit
973        prefix_char: the prefix char of parsed cmd
974    Returns:
975        the new para using in a command like -e testFile xxx
976        -e coverage true...
977    """
978    ret_str = []
979    path = "/%s/%s/%s" % ("data", "local", "ajur")
980    include_file = "%s/%s" % (path, "includes.txt")
981    exclude_file = "%s/%s" % (path, "excludes.txt")
982
983    if not isinstance(junit_paras, dict):
984        LOG.warning("The para of junit is not the dict format as required")
985        return ""
986    # Disable screen keyguard
987    disable_key_guard = junit_paras.get('disable-keyguard')
988    if not disable_key_guard or disable_key_guard[0].lower() != 'false':
989        disable_keyguard(device)
990
991    for para_name in junit_paras.keys():
992        path = "/%s/%s/%s/" % ("data", "local", "ajur")
993        if para_name.strip() == 'test-file-include-filter':
994            for file_name in junit_paras[para_name]:
995                device.push_file(file_name, include_file)
996                device.execute_shell_command(
997                    'chown -R shell:shell %s' % path)
998            ret_str.append(prefix_char + " ".join(['testFile', include_file]))
999        elif para_name.strip() == "test-file-exclude-filter":
1000            for file_name in junit_paras[para_name]:
1001                device.push_file(file_name, include_file)
1002                device.execute_shell_command(
1003                    'chown -R shell:shell %s' % path)
1004            ret_str.append(prefix_char + " ".join(['notTestFile',
1005                                                   exclude_file]))
1006        elif para_name.strip() == "test" or para_name.strip() == "class":
1007            result = get_class(junit_paras, prefix_char, para_name.strip())
1008            ret_str.append(result)
1009        elif para_name.strip() == "include-annotation":
1010            ret_str.append(prefix_char + " ".join(
1011                ['annotation', ",".join(junit_paras[para_name])]))
1012        elif para_name.strip() == "exclude-annotation":
1013            ret_str.append(prefix_char + " ".join(
1014                ['notAnnotation', ",".join(junit_paras[para_name])]))
1015        else:
1016            ret_str.append(prefix_char + " ".join(
1017                [para_name, ",".join(junit_paras[para_name])]))
1018
1019    return " ".join(ret_str)
1020
1021
1022def get_app_name(hap_app):
1023    hap_name = os.path.basename(hap_app).replace(".hap", "")
1024    app_name = ""
1025    hap_file_info = None
1026    config_json_file = ""
1027    try:
1028        hap_file_info = zipfile.ZipFile(hap_app)
1029        name_list = ["module.json", "config.json"]
1030        for _, name in enumerate(hap_file_info.namelist()):
1031            if name in name_list:
1032                config_json_file = name
1033                break
1034        config_info = hap_file_info.read(config_json_file).decode('utf-8')
1035        attrs = json.loads(config_info)
1036        if "app" in attrs.keys() and \
1037                "bundleName" in attrs.get("app", dict()).keys():
1038            app_name = attrs["app"]["bundleName"]
1039            LOG.info("Obtain the app name {} from json "
1040                     "successfully".format(app_name))
1041        else:
1042            LOG.debug("Tip: 'app' or 'bundleName' not "
1043                      "in %s.hap/config.json" % hap_name)
1044    except Exception as e:
1045        LOG.error("get app name from hap error: {}".format(e))
1046    finally:
1047        if hap_file_info:
1048            hap_file_info.close()
1049    return app_name
1050
1051
1052def oh_jsunit_para_parse(runner, junit_paras):
1053    junit_paras = dict(junit_paras)
1054    test_type_list = ["function", "performance", "reliability", "security"]
1055    size_list = ["small", "medium", "large"]
1056    level_list = ["0", "1", "2", "3"]
1057    for para_name in junit_paras.keys():
1058        para_name = para_name.strip()
1059        para_values = junit_paras.get(para_name, [])
1060        if para_name == "class":
1061            runner.add_arg(para_name, ",".join(para_values))
1062        elif para_name == "notClass":
1063            runner.add_arg(para_name, ",".join(para_values))
1064        elif para_name == "testType":
1065            if para_values[0] not in test_type_list:
1066                continue
1067            # function/performance/reliability/security
1068            runner.add_arg(para_name, para_values[0])
1069        elif para_name == "size":
1070            if para_values[0] not in size_list:
1071                continue
1072            # size small/medium/large
1073            runner.add_arg(para_name, para_values[0])
1074        elif para_name == "level":
1075            if para_values[0] not in level_list:
1076                continue
1077            # 0/1/2/3/4
1078            runner.add_arg(para_name, para_values[0])
1079        elif para_name == "stress":
1080            runner.add_arg(para_name, para_values[0])
1081