• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import json
19import re
20import shutil
21import time
22import os
23import threading
24import copy
25import platform
26import subprocess
27import sys
28import tempfile
29import warnings
30from typing import Tuple
31from xml.etree import ElementTree
32
33from xdevice import DeviceOsType
34from xdevice import Variables
35from xdevice import FilePermission
36from xdevice import ParamError
37from xdevice import ProductForm
38from xdevice import ReportException
39from xdevice import IDevice
40from xdevice import platform_logger
41from xdevice import Plugin
42from xdevice import exec_cmd
43from xdevice import ConfigConst
44from xdevice import HdcError
45from xdevice import DeviceAllocationState
46from xdevice import DeviceConnectorType
47from xdevice import TestDeviceState
48from xdevice import AdvanceDeviceOption
49from xdevice import convert_serial
50from xdevice import check_path_legal
51from xdevice import start_standing_subprocess
52from xdevice import stop_standing_subprocess
53from xdevice import DeviceProperties
54from xdevice import get_cst_time
55from xdevice import get_file_absolute_path
56from xdevice import Platform
57from xdevice import AppInstallError
58from xdevice import AgentMode
59from xdevice import check_uitest_version
60from xdevice import ShellCommandUnresponsiveException
61from ohos.environment.dmlib import HdcHelper
62from ohos.environment.dmlib import CollectingOutputReceiver
63from ohos.utils import parse_strings_key_value
64from ohos.error import ErrorMessage
65from ohos.exception import OHOSRpcNotRunningError
66from ohos.exception import OHOSDeveloperModeNotTrueError
67from ohos.exception import OHOSRpcProcessNotFindError
68from ohos.exception import OHOSRpcPortNotFindError
69from ohos.exception import OHOSRpcStartFailedError
70from ohos.exception import HDCFPortError
71
72__all__ = ["Device"]
73TIMEOUT = 300 * 1000
74RETRY_ATTEMPTS = 2
75DEFAULT_UNAVAILABLE_TIMEOUT = 20 * 1000
76BACKGROUND_TIME = 2 * 60 * 1000
77LOG = platform_logger("Device")
78DEVICETEST_HAP_PACKAGE_NAME = "com.ohos.devicetest"
79DEVICE_TEMP_PATH = "/data/local/tmp"
80QUERY_DEVICE_PROP_BIN = "testcases/queryStandard"
81UITEST_NAME = "uitest"
82UITEST_SINGLENESS = "singleness"
83EXTENSION_NAME = "--extension-name"
84UITEST_PATH = "/system/bin/uitest"
85UITEST_SHMF = "/data/app/el2/100/base/{}/cache/shmf".format(DEVICETEST_HAP_PACKAGE_NAME)
86UITEST_COMMAND = "{} start-daemon 0123456789".format(UITEST_PATH)
87NATIVE_CRASH_PATH = "/data/log/faultlog/temp"
88JS_CRASH_PATH = "/data/log/faultlog/faultlogger"
89ROOT_PATH = "/data/log/faultlog"
90KINGKONG_PATH = "/data/local/tmp/kingkongDir"
91LOGLEVEL = ["DEBUG", "INFO", "WARN", "ERROR", "FATAL"]
92HILOG_PATH = "/data/log/hilog"
93SUCCESS_CODE = "0"
94
95
96def perform_device_action(func):
97    def callback_to_outer(device, msg):
98        # callback to decc ui
99        if getattr(device, "callback_method", None):
100            device.callback_method(msg)
101
102    def device_action(self, *args, **kwargs):
103        if not self.get_recover_state():
104            LOG.debug("Device {} {} is false".format(self.device_sn,
105                                                     ConfigConst.recover_state))
106            return None
107        # avoid infinite recursion, such as device reboot
108        abort_on_exception = bool(kwargs.get("abort_on_exception", False))
109        if abort_on_exception:
110            result = func(self, *args, **kwargs)
111            return result
112
113        tmp = int(kwargs.get("retry", RETRY_ATTEMPTS))
114        retry = tmp + 1 if tmp > 0 else 1
115        exception = None
116        for _ in range(retry):
117            try:
118                result = func(self, *args, **kwargs)
119                return result
120            except ReportException as error:
121                self.log.exception("Generate report error!", exc_info=False)
122                exception = error
123            except (ConnectionResetError,  # pylint:disable=undefined-variable
124                    ConnectionRefusedError,  # pylint:disable=undefined-variable
125                    ConnectionAbortedError) as error:  # pylint:disable=undefined-variable
126                self.log.error("error type: {}, error: {}".format
127                               (error.__class__.__name__, error))
128                # check hdc if is running
129                if not HdcHelper.check_if_hdc_running():
130                    LOG.debug("{} not running, set device {} {} false".format(
131                        HdcHelper.CONNECTOR_NAME, self.device_sn, ConfigConst.recover_state))
132                    self.set_recover_state(False)
133                    callback_to_outer(self, "recover failed")
134                    raise error
135                callback_to_outer(self, "error:{}, prepare to recover".format(error))
136                if not self.recover_device():
137                    LOG.debug("Set device {} {} false".format(
138                        self.device_sn, ConfigConst.recover_state))
139                    self.set_recover_state(False)
140                    callback_to_outer(self, "recover failed")
141                    raise error
142                exception = error
143                callback_to_outer(self, "recover success")
144            except HdcError as error:
145                self.log.error("error type: {}, error: {}".format(error.__class__.__name__, error))
146                callback_to_outer(self, "error:{}, prepare to recover".format(error))
147                if not self.recover_device():
148                    LOG.debug("Set device {} {} false".format(
149                        self.device_sn, ConfigConst.recover_state))
150                    self.set_recover_state(False)
151                    callback_to_outer(self, "recover failed")
152                    raise error
153                exception = error
154                callback_to_outer(self, "recover success")
155            except Exception as error:
156                self.log.exception("error type: {}, error: {}".format(
157                    error.__class__.__name__, error), exc_info=False)
158                exception = error
159        raise exception
160
161    return device_action
162
163
164@Plugin(type=Plugin.DEVICE, id=DeviceOsType.default)
165class Device(IDevice):
166    """
167    Class representing a device.
168
169    Each object of this class represents one device in xDevice,
170    including handles to hdc, fastboot, and test agent (DeviceTest.apk).
171
172    Attributes:
173        device_sn: A string that's the serial number of the device.
174    """
175
176    device_sn = None
177    host = None
178    port = None
179    usb_type = DeviceConnectorType.hdc
180    is_timeout = False
181    device_hilog_proc = None
182    device_os_type = DeviceOsType.default
183    test_device_state = None
184    device_allocation_state = DeviceAllocationState.available
185    label = ProductForm.phone
186    log = platform_logger("Device")
187    device_state_monitor = None
188    reboot_timeout = 2 * 60 * 1000
189    _device_log_collector = None
190
191    _proxy = None
192    _abc_proxy = None
193    _agent_mode = AgentMode.bin
194    initdevice = True
195    d_port = 8011
196    abc_d_port = 8012
197    _uitestdeamon = None
198    rpc_timeout = 300
199    device_id = None
200    reconnecttimes = 0
201    _h_port = None
202    oh_module_package = None
203    module_ablity_name = None
204    _device_report_path = None
205    test_platform = Platform.ohos
206    _webview = None
207    _is_root = None
208
209    model_dict = {
210        'default': ProductForm.phone,
211        'phone': ProductForm.phone,
212        'car': ProductForm.car,
213        'tv': ProductForm.television,
214        'watch': ProductForm.watch,
215        'wearable': ProductForm.wearable,
216        'tablet': ProductForm.tablet,
217        '2in1': ProductForm._2in1,
218        'nosdcard': ProductForm.phone
219    }
220
221    device_params = {
222        DeviceProperties.system_sdk: "",
223        DeviceProperties.system_version: "",
224        DeviceProperties.build_number: "",
225        DeviceProperties.cpu_abi: "",
226        DeviceProperties.device_form: "PHYSICAL",
227        DeviceProperties.software_version: "",
228        DeviceProperties.fault_code: "",
229        DeviceProperties.fold_screen: "",
230        DeviceProperties.hardware: "",
231        DeviceProperties.is_ark: "",
232        DeviceProperties.mac: "",
233        DeviceProperties.mobile_service: "",
234        DeviceProperties.model: "",
235        DeviceProperties.rom: "",
236        DeviceProperties.rooted: "",
237        DeviceProperties.sn: "",
238        DeviceProperties.xres: "",
239        DeviceProperties.yres: "",
240        DeviceProperties.manufacturer: "",
241        DeviceProperties.kind: 2
242    }
243
244    device_params_command = {
245        DeviceProperties.system_sdk: "const.ohos.apiversion",
246        DeviceProperties.system_version: "",
247        DeviceProperties.build_number: "",
248        DeviceProperties.cpu_abi: "const.product.cpu.abilist",
249        DeviceProperties.device_form: "",
250        DeviceProperties.software_version: "const.product.software.version",
251        DeviceProperties.fault_code: "",
252        DeviceProperties.fold_screen: "",
253        DeviceProperties.hardware: "ohos.boot.hardware",
254        DeviceProperties.is_ark: "",
255        DeviceProperties.mac: "",
256        DeviceProperties.mobile_service: "ro.odm.config.modem_number",
257        DeviceProperties.model: "ohos.boot.hardware",
258        DeviceProperties.rom: "",
259        DeviceProperties.rooted: "",
260        DeviceProperties.xres: "",
261        DeviceProperties.yres: "",
262        DeviceProperties.manufacturer: "const.product.manufacturer",
263        DeviceProperties.kind: ""
264    }
265
266    def __init__(self):
267        self.extend_value = {}
268        self.device_lock = threading.RLock()
269        self.forward_ports = []
270        self.forward_ports_abc = []
271        self.proxy_listener = None
272        self.win_proxy_listener = None
273        self.device_props = {}
274        self.device_description = {}
275
276    def __eq__(self, other):
277        return self.device_sn == other.__get_serial__() and \
278            self.device_os_type == other.device_os_type and \
279            self.host == other.host
280
281    def init_description(self):
282        if self.device_description:
283            return
284        desc = {
285            DeviceProperties.sn: convert_serial(self.device_sn),
286            DeviceProperties.model: self.get_property_value("const.product.model"),
287            DeviceProperties.type_: self.get_device_type(),
288            DeviceProperties.platform: self._get_device_platform(),
289            DeviceProperties.version: self.get_property_value(
290                self.device_params_command.get(DeviceProperties.software_version)),
291            DeviceProperties.others: self.device_props
292        }
293        self.device_description.update(desc)
294
295    def __set_serial__(self, device_sn=""):
296        self.device_sn = device_sn
297        return self.device_sn
298
299    def __get_serial__(self):
300        return self.device_sn
301
302    def extend_device_props(self):
303        if self.device_props:
304            return
305        try:
306            query_bin_path = get_file_absolute_path(QUERY_DEVICE_PROP_BIN)
307        except ParamError:
308            query_bin_path = ""
309        if query_bin_path == "":
310            return
311        self.push_file(query_bin_path, DEVICE_TEMP_PATH)
312        file_name = os.path.basename(query_bin_path)
313        cmd = f"cd {DEVICE_TEMP_PATH} && chmod +x {file_name} && ./{file_name}"
314        out = self.execute_shell_command(
315            cmd, timeout=5 * 1000, output_flag=False, retry=RETRY_ATTEMPTS, abort_on_exception=False).strip()
316        if not out:
317            return
318        LOG.info(out)
319        params = parse_strings_key_value(out)
320        self.device_props.update(params)
321
322    def get(self, key=None, default=None):
323        if not key:
324            return default
325        value = getattr(self, key, None)
326        if value:
327            return value
328        else:
329            return self.extend_value.get(key, default)
330
331    def recover_device(self):
332        if not self.get_recover_state():
333            LOG.debug("Device %s %s is false, cannot recover device" % (
334                self.device_sn, ConfigConst.recover_state))
335            return False
336
337        result = self.device_state_monitor.wait_for_device_available(self.reboot_timeout)
338        if result:
339            self.device_log_collector.restart_catch_device_log()
340        return result
341
342    def _get_device_platform(self):
343        self.test_platform = "OpenHarmony"
344        return self.test_platform
345
346    def get_device_type(self):
347        try:
348            model = self.get_property("const.product.devicetype",
349                                      abort_on_exception=True)
350        except ShellCommandUnresponsiveException:
351            model = "default"
352        model = "default" if model == "" else model
353        self.label = self.model_dict.get(model, ProductForm.phone)
354        return self.label
355
356    def get_property(self, prop_name, retry=RETRY_ATTEMPTS,
357                     abort_on_exception=False):
358        """
359        Hdc command, ddmlib function.
360        """
361        if not self.get_recover_state():
362            return ""
363        command = "param get %s" % prop_name
364        stdout = self.execute_shell_command(command, timeout=5 * 1000,
365                                            output_flag=False,
366                                            retry=retry,
367                                            abort_on_exception=abort_on_exception).strip()
368        if stdout:
369            LOG.debug(stdout)
370        return stdout
371
372    def get_property_value(self, prop_name, retry=RETRY_ATTEMPTS,
373                           abort_on_exception=False):
374        """
375        Hdc command, ddmlib function.
376        """
377        if not self.get_recover_state():
378            return ""
379        command = "param get %s" % prop_name
380        stdout = self.execute_shell_command(command, timeout=5 * 1000,
381                                            output_flag=False,
382                                            retry=retry,
383                                            abort_on_exception=abort_on_exception).strip()
384        if "fail" in stdout:
385            return ""
386        return stdout
387
388    @perform_device_action
389    def connector_command(self, command, **kwargs):
390        timeout = int(kwargs.get("timeout", TIMEOUT)) / 1000
391        error_print = bool(kwargs.get("error_print", True))
392        join_result = bool(kwargs.get("join_result", False))
393        timeout_msg = '' if timeout == 300.0 else \
394            " with timeout %ss" % timeout
395        if self.host != "127.0.0.1":
396            cmd = [HdcHelper.CONNECTOR_NAME, "-s", "{}:{}".format(self.host, self.port), "-t", self.device_sn]
397        else:
398            cmd = [HdcHelper.CONNECTOR_NAME, "-t", self.device_sn]
399        LOG.debug("{} execute command {} {} {}".format(convert_serial(self.device_sn),
400                                                       HdcHelper.CONNECTOR_NAME,
401                                                       command, timeout_msg))
402        if isinstance(command, list):
403            cmd.extend(command)
404        else:
405            command = command.strip()
406            cmd.extend(command.split(" "))
407        result = exec_cmd(cmd, timeout, error_print, join_result)
408        if not result:
409            return result
410        is_print = bool(kwargs.get("is_print", True))
411        if is_print:
412            for line in str(result).split("\n"):
413                if line.strip():
414                    LOG.debug(line.strip())
415        return result
416
417    @perform_device_action
418    def execute_shell_command(self, command, timeout=TIMEOUT,
419                              receiver=None, **kwargs):
420        if not receiver:
421            collect_receiver = CollectingOutputReceiver()
422            HdcHelper.execute_shell_command(
423                self, command, timeout=timeout,
424                receiver=collect_receiver, **kwargs)
425            return collect_receiver.output
426        else:
427            return HdcHelper.execute_shell_command(
428                self, command, timeout=timeout,
429                receiver=receiver, **kwargs)
430
431    def execute_shell_cmd_background(self, command, timeout=TIMEOUT,
432                                     receiver=None):
433        status = HdcHelper.execute_shell_command(self, command,
434                                                 timeout=timeout,
435                                                 receiver=receiver)
436
437        self.wait_for_device_not_available(DEFAULT_UNAVAILABLE_TIMEOUT)
438        self.device_state_monitor.wait_for_device_available(BACKGROUND_TIME)
439        cmd = "target mount"
440        self.connector_command(cmd)
441        self.device_log_collector.restart_catch_device_log()
442        return status
443
444    def wait_for_device_not_available(self, wait_time):
445        return self.device_state_monitor.wait_for_device_not_available(
446            wait_time)
447
448    def _wait_for_device_online(self, wait_time=None):
449        return self.device_state_monitor.wait_for_device_online(wait_time)
450
451    def _do_reboot(self):
452        HdcHelper.reboot(self)
453        self.wait_for_boot_completion()
454
455    def _reboot_until_online(self):
456        self._do_reboot()
457
458    def reboot(self):
459        self._reboot_until_online()
460        self.device_log_collector.restart_catch_device_log()
461
462    @perform_device_action
463    def install_package(self, package_path, command=""):
464        if package_path is None:
465            raise HdcError(ErrorMessage.Device.Code_0303005)
466        return HdcHelper.install_package(self, package_path, command)
467
468    @perform_device_action
469    def uninstall_package(self, package_name):
470        return HdcHelper.uninstall_package(self, package_name)
471
472    @perform_device_action
473    def push_file(self, local, remote, **kwargs):
474        """
475        Push a single file.
476        The top directory won't be created if is_create is False (by default)
477        and vice versa
478        """
479        local = "\"{}\"".format(local)
480        remote = "\"{}\"".format(remote)
481        if local is None:
482            raise HdcError(ErrorMessage.Device.Code_0303001)
483
484        remote_is_dir = kwargs.get("remote_is_dir", False)
485        if remote_is_dir:
486            ret = self.execute_shell_command("test -d %s && echo 0" % remote, retry=0)
487            if not (ret != "" and len(str(ret).split()) != 0 and
488                    str(ret).split()[0] == "0"):
489                self.execute_shell_command("mkdir -p %s" % remote, retry=0)
490
491        if self.host != "127.0.0.1":
492            self.connector_command("file send {} {}".format(local, remote), retry=0)
493        else:
494            is_create = kwargs.get("is_create", False)
495            timeout = kwargs.get("timeout", TIMEOUT)
496            HdcHelper.push_file(self, local, remote, is_create=is_create,
497                                timeout=timeout)
498        if not self.is_file_exist(remote):
499            err_msg = ErrorMessage.Device.Code_0303004.format(local, remote)
500            LOG.error(err_msg)
501            raise HdcError(err_msg)
502
503    @perform_device_action
504    def pull_file(self, remote, local, **kwargs):
505        """
506        Pull a single file.
507        The top directory won't be created if is_create is False (by default)
508        and vice versa
509        """
510        local = "\"{}\"".format(local)
511        remote = "\"{}\"".format(remote)
512        self.connector_command("file recv {} {}".format(remote, local), retry=0)
513
514    @property
515    def is_root(self):
516        if self._is_root is None:
517            ret = self.execute_shell_command("whoami")
518            LOG.debug(ret)
519            self._is_root = True if "root" in ret else False
520        return self._is_root
521
522    def is_directory(self, path):
523        path = check_path_legal(path)
524        output = self.execute_shell_command("ls -ld {}".format(path))
525        if output and output.startswith('d'):
526            return True
527        return False
528
529    def is_file_exist(self, file_path):
530        file_path = check_path_legal(file_path)
531        output = self.execute_shell_command("ls {}".format(file_path))
532        if output and "No such file or directory" not in output:
533            return True
534        return False
535
536    def get_recover_result(self, retry=RETRY_ATTEMPTS):
537        command = "param get bootevent.boot.completed"
538        stdout = self.execute_shell_command(command, timeout=5 * 1000,
539                                            output_flag=False, retry=retry,
540                                            abort_on_exception=True).strip()
541        LOG.debug("device recover status: {}".format(stdout))
542        return stdout
543
544    def set_recover_state(self, state):
545        with self.device_lock:
546            setattr(self, ConfigConst.recover_state, state)
547            if not state:
548                self.test_device_state = TestDeviceState.NOT_AVAILABLE
549                self.device_allocation_state = DeviceAllocationState.unavailable
550                self.call_proxy_listener()
551
552    def get_recover_state(self, default_state=True):
553        with self.device_lock:
554            state = getattr(self, ConfigConst.recover_state, default_state)
555            return state
556
557    def wait_for_boot_completion(self):
558        """Waits for the device to boot up.
559
560        Returns:
561            True if the device successfully finished booting, False otherwise.
562        """
563        return self.device_state_monitor.wait_for_boot_complete(self.reboot_timeout)
564
565    @classmethod
566    def check_recover_result(cls, recover_result):
567        return "true" in recover_result
568
569    @property
570    def device_log_collector(self):
571        if self._device_log_collector is None:
572            self._device_log_collector = DeviceLogCollector(self)
573        return self._device_log_collector
574
575    def close(self):
576        self.reconnecttimes = 0
577        try:
578            from devicetest.controllers.tools.recorder.record_agent import RecordAgent
579            if RecordAgent.instance:
580                RecordAgent.instance.terminate()
581        except Exception as error:
582            self.log.error(' RecordAgent terminate error: {}.'.format(str(error)))
583
584    def reset(self):
585        self.log.debug("start reset device...")
586        self.call_proxy_listener()
587        if self._proxy is not None:
588            self._proxy.close()
589        self._proxy = None
590        if self._uitestdeamon is not None:
591            self._uitestdeamon = None
592        if self.is_bin and not self.kill_uitest:
593            self.stop_harmony_rpc(kill_uitest=False)
594        else:
595            self.stop_harmony_rpc()
596        self.remove_ports()
597        self.device_log_collector.stop_restart_catch_device_log()
598
599    @property
600    def kill_uitest(self):
601        task_args = Variables.config.taskargs
602        return task_args.get("kill_uitest", "").lower() == "true"
603
604    @property
605    def is_bin(self):
606        # _agent_mode init in device test driver
607        # 0 is hap, 1 is abc, 2 is bin
608        return False if self._agent_mode == AgentMode.hap else True
609
610    def set_agent_mode(self, mode: AgentMode = AgentMode.bin):
611        if not mode:
612            mode = AgentMode.bin
613        if mode == AgentMode.hap and not self.is_root:
614            LOG.debug("Current device is not root, can not set hap mode, change to bin mode.")
615            self._agent_mode = AgentMode.bin
616        else:
617            self._agent_mode = mode
618
619        if self._agent_mode == AgentMode.hap:
620            LOG.debug("Current mode is normal mode.")
621        else:
622            self._agent_mode = AgentMode.bin
623            LOG.debug("Current mode is binary mode.")
624
625    def check_if_bin(self):
626        ret = False
627        self._agent_mode = AgentMode.abc
628        base_version = tuple("4.1.3.9".split("."))
629        uitest_version = self.execute_shell_command("/system/bin/uitest --version")
630        self.log.debug("uitest version is {}".format(uitest_version))
631        if check_uitest_version(uitest_version, base_version):
632            self._agent_mode = AgentMode.bin
633            ret = True
634        self.log.debug("{}".format("Binary agent run in {} mode".format(self._agent_mode)))
635        return ret
636
637    def _check_developer_mode_status(self):
638        if not self.is_root:
639            return True
640        status = self.execute_shell_command("param get const.security.developermode.state")
641        self.log.debug(status)
642        if status and status.strip() == "true":
643            return True
644        else:
645            return False
646
647    @property
648    def proxy(self):
649        """The first rpc session initiated on this device. None if there isn't
650        one.
651        """
652        try:
653            if self._proxy is None:
654                self.log.debug("{}".format("Hap agent run in {} mode".format(self._agent_mode)))
655                # check uitest
656                self.check_uitest_status()
657                self._proxy = self.get_harmony()
658        except HDCFPortError as error:
659            raise error
660        except AppInstallError as error:
661            raise error
662        except OHOSRpcNotRunningError as error:
663            raise error
664        except Exception as error:
665            self._proxy = None
666            self.log.error("DeviceTest-10012 proxy:%s" % str(error))
667        return self._proxy
668
669    @property
670    def abc_proxy(self):
671        """The first rpc session initiated on this device. None if there isn't
672        one.
673        """
674        try:
675            if self._abc_proxy is None:
676                # check uitest
677                self.check_uitest_status()
678                self._abc_proxy = self.get_harmony(start_abc=True)
679        except HDCFPortError as error:
680            raise error
681        except OHOSRpcNotRunningError as error:
682            raise error
683        except Exception as error:
684            self._abc_proxy = None
685            self.log.error("DeviceTest-10012 abc_proxy:%s" % str(error))
686        return self._abc_proxy
687
688    @property
689    def uitestdeamon(self):
690        from devicetest.controllers.uitestdeamon import \
691            UiTestDeamon
692        if self._uitestdeamon is None:
693            self._uitestdeamon = UiTestDeamon(self)
694        return self._uitestdeamon
695
696    @classmethod
697    def set_module_package(cls, module_packag):
698        cls.oh_module_package = module_packag
699
700    @classmethod
701    def set_moudle_ablity_name(cls, module_ablity_name):
702        cls.module_ablity_name = module_ablity_name
703
704    @property
705    def is_oh(self):
706        return True
707
708    def get_harmony(self, start_abc=False):
709        if self.initdevice:
710            if start_abc:
711                self.start_abc_rpc(re_install_rpc=True)
712            else:
713                self.start_harmony_rpc(re_install_rpc=True)
714        # clear old port,because abc and fast mode will not remove port
715        self.fport_tcp_port(start_abc=start_abc)
716        rpc_proxy = None
717        try:
718            from devicetest.controllers.openharmony import OpenHarmony
719            rpc_proxy = OpenHarmony(port=self._h_port, addr=self.host, timeout=self.rpc_timeout, device=self)
720        except Exception as error:
721            self.log.error(' proxy init error: {}.'.format(str(error)))
722        return rpc_proxy
723
724    def start_uitest(self):
725        result = ""
726        if self.is_bin:
727            result = self.execute_shell_command("{} start-daemon singleness".format(UITEST_PATH))
728        else:
729            share_mem_mode = False
730            base_version = [3, 2, 2, 2]
731            uitest_version = self.execute_shell_command("{} --version".format(UITEST_PATH))
732            if uitest_version and re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}', uitest_version):
733                uitest_version = uitest_version.split(".")
734                for index, _ in enumerate(uitest_version):
735                    if int(uitest_version[index]) > base_version[index]:
736                        share_mem_mode = True
737                        break
738            else:
739                share_mem_mode = True
740            if share_mem_mode:
741                if not self.is_file_exist(UITEST_SHMF):
742                    self.log.debug('Path {} not exist, create it.'.format(UITEST_SHMF))
743                    self.execute_shell_command("echo abc > {}".format(UITEST_SHMF))
744                    self.execute_shell_command("chmod -R 666 {}".format(UITEST_SHMF))
745                result = self.execute_shell_command("{} start-daemon {}".format(UITEST_PATH, UITEST_SHMF))
746            else:
747                result = self.execute_shell_command(UITEST_COMMAND)
748        self.log.debug('start uitest, {}'.format(result))
749
750    def start_harmony_rpc(self, re_install_rpc=False, reconnect=False):
751        if not self._check_developer_mode_status():
752            raise OHOSDeveloperModeNotTrueError(ErrorMessage.Device.Code_0303015, device=self)
753
754        if not reconnect and self.check_rpc_status(check_abc=False, check_times=1) == SUCCESS_CODE:
755            if (hasattr(sys, ConfigConst.env_pool_cache) and
756                getattr(sys, ConfigConst.env_pool_cache, False)) \
757                    or not re_install_rpc:
758                self.log.debug('Harmony rpc already start!!!!')
759                return
760        if re_install_rpc:
761            try:
762                from devicetest.controllers.openharmony import OpenHarmony
763                OpenHarmony.install_harmony_rpc(self)
764            except ImportError as error:  # pylint:disable=undefined-variable
765                self.log.debug(str(error))
766                self.log.error('please check devicetest extension module is exist.')
767                raise Exception(ErrorMessage.Config.Code_0302006)
768            except AppInstallError as error:
769                raise error
770            except Exception as error:
771                self.log.debug(str(error))
772                self.log.error('root device init RPC error.')
773                raise Exception(ErrorMessage.Config.Code_0302006)
774        if not self.is_bin:
775            self.stop_harmony_rpc(reconnect=reconnect)
776        else:
777            self.log.debug('Binary mode, kill hap if hap is running.')
778            self.stop_harmony_rpc(kill_uitest=False, reconnect=reconnect)
779        cmd = "aa start -a {}.ServiceAbility -b {}".format(DEVICETEST_HAP_PACKAGE_NAME, DEVICETEST_HAP_PACKAGE_NAME)
780        result = self.execute_shell_command(cmd)
781        self.log.debug('start devicetest ability, {}'.format(result))
782        if "successfully" not in result:
783            raise OHOSRpcStartFailedError(ErrorMessage.Device.Code_0303016.format(
784                "system" if self.is_bin else "normal", result), device=self)
785        if not self.is_bin:
786            self.start_uitest()
787        time.sleep(1)
788        check_result = self.check_rpc_status(check_abc=False)
789        self.raise_exception(check_result)
790
791    def raise_exception(self, error_code: str):
792        if error_code == SUCCESS_CODE:
793            return
794        rpc_mode = "system" if self.is_bin else "normal"
795        if error_code == ErrorMessage.Device.Code_0303025.code:
796            raise OHOSRpcProcessNotFindError(ErrorMessage.Device.Code_0303025, device=self)
797        elif error_code == ErrorMessage.Device.Code_0303026.code:
798            raise OHOSRpcPortNotFindError(ErrorMessage.Device.Code_0303026, device=self)
799        elif error_code == ErrorMessage.Device.Code_0303027.code:
800            raise OHOSRpcProcessNotFindError(ErrorMessage.Device.Code_0303023.format(rpc_mode), device=self)
801        elif error_code == ErrorMessage.Device.Code_0303028.code:
802            raise OHOSRpcPortNotFindError(ErrorMessage.Device.Code_0303024.format(rpc_mode), device=self)
803
804    def start_abc_rpc(self, re_install_rpc=False, reconnect=False):
805        if re_install_rpc:
806            try:
807                from devicetest.controllers.openharmony import OpenHarmony
808                OpenHarmony.init_agent_resource(self)
809            except ImportError as error:  # pylint:disable=undefined-variable
810                self.log.debug(str(error))
811                self.log.error('please check devicetest extension module is exist.')
812                raise error
813            except Exception as error:
814                self.log.debug(str(error))
815                self.log.error('root device init abc RPC error.')
816                raise error
817        if reconnect:
818            self.stop_harmony_rpc(kill_hap=False, reconnect=reconnect)
819        if self.is_bin and self.check_rpc_status(check_abc=True, check_times=1) == SUCCESS_CODE:
820            self.log.debug('Harmony abc rpc already start!!!!')
821            return
822        self.start_uitest()
823        time.sleep(1)
824        check_result = self.check_rpc_status(check_abc=True)
825        self.raise_exception(check_result)
826
827    def stop_harmony_rpc(self, kill_uitest=True, kill_hap=True, reconnect=False):
828        if not self.get_recover_state():
829            LOG.warning("device state is false, skip stop harmony rpc.")
830            return
831        proc_pids = self.get_devicetest_proc_pid()
832        for index, pid in enumerate(proc_pids):
833            if not kill_uitest and kill_hap and index == 1:
834                continue
835            if not kill_hap and kill_uitest and index == 2:
836                continue
837            if pid != "":
838                if reconnect:
839                    name = "uitest" if index != 2 else "devicetest"
840                    self._dump_pid_info(pid, name)
841                cmd = 'kill -9 {}'.format(pid)
842                ret = self.execute_shell_command(cmd)
843                if index == 2 and "Operation not permitted" in ret:
844                    stop_hap = 'aa force-stop {}'.format(DEVICETEST_HAP_PACKAGE_NAME)
845                    self.execute_shell_command(stop_hap)
846        self.wait_listen_port_disappear()
847
848    def wait_listen_port_disappear(self):
849        end_time = time.time() + 5
850        times = 0
851        while time.time() < end_time:
852            if times == 0:
853                is_print = True
854            else:
855                is_print = False
856            if not self.is_harmony_rpc_socket_running(self.d_port, is_print=is_print):
857                break
858            times += 1
859        if times > 0:
860            self.is_harmony_rpc_socket_running(self.d_port, is_print=True)
861
862    def _dump_pid_info(self, pid, name):
863        try:
864            path = os.path.join(self._device_report_path, "log", "pid_info")
865            if not os.path.exists(path):
866                os.makedirs(path)
867            file_path = os.path.join(path, "{}_pid_info_{}.txt".format(name, pid))
868            pid_info_file = os.open(file_path, os.O_WRONLY | os.O_CREAT | os.O_APPEND, FilePermission.mode_755)
869            ret = self.execute_shell_command("dumpcatcher -p {}".format(pid))
870            with os.fdopen(pid_info_file, "a") as pid_info_file_pipe:
871                pid_info_file_pipe.write(ret)
872        except Exception as e:
873            LOG.error("Dump {} pid info fail. Error: {}".format(pid, e))
874
875    # check uitest if running well, otherwise kill it first
876    def check_uitest_status(self):
877        if not self.is_root:
878            ret = self.execute_shell_command("uitest --version")
879            if "inaccessible or not found" in ret:
880                raise OHOSDeveloperModeNotTrueError(ErrorMessage.Device.Code_0303021, device=self)
881        self.log.debug('Check uitest running status.')
882        proc_pids = self.get_devicetest_proc_pid()
883        if proc_pids[2] != "" and not self._proxy:
884            self.execute_shell_command('kill -9 {}'.format(proc_pids[2]))
885        if self.is_bin and proc_pids[0] != "":
886            self.execute_shell_command('kill -9 {}'.format(proc_pids[0]))
887            self.log.debug('Uitest is running in normal mode, current mode is bin/abc, wait it exit.')
888        if not self.is_bin and proc_pids[1] != "":
889            self.execute_shell_command('kill -9 {}'.format(proc_pids[1]))
890            self.log.debug('Uitest is running in abc mode, current mode is normal, wait it exit.')
891        self.log.debug('Finish check uitest running status.')
892
893    def get_devicetest_proc_pid(self):
894        # # 0-uitest 1-uitest-sigleness 2-hap
895        proc_pids = [""] * 3
896        if not self.is_bin:
897            proc_pids[0] = self.execute_shell_command("pidof {}".format(UITEST_NAME)).strip()
898        else:
899            cmd = 'ps -ef | grep {}'.format(UITEST_SINGLENESS)
900            proc_running = self.execute_shell_command(cmd).strip()
901            proc_running = proc_running.split("\n")
902            for data in proc_running:
903                if UITEST_SINGLENESS in data and "grep" not in data and EXTENSION_NAME not in data:
904                    data = data.split()
905                    proc_pids[1] = data[1]
906        proc_pids[2] = self.execute_shell_command("pidof {}".format(DEVICETEST_HAP_PACKAGE_NAME)).strip()
907
908        return proc_pids
909
910    def is_harmony_rpc_running(self, check_abc=False):
911        proc_pids = self.get_devicetest_proc_pid()
912        if not self.is_bin:
913            self.log.debug('is_proc_running: agent pid: {}, uitest pid: {}'.format(proc_pids[2], proc_pids[0]))
914            if proc_pids[2] != "" and proc_pids[0] != "":
915                return True
916        else:
917            if check_abc:
918                self.log.debug('is_proc_running: uitest pid: {}'.format(proc_pids[1]))
919                if proc_pids[1] != "":
920                    return True
921            else:
922                self.log.debug('is_proc_running: agent pid: {}'.format(proc_pids[2]))
923                if proc_pids[2] != "":
924                    return True
925        return False
926
927    def is_harmony_rpc_socket_running(self, port: int, check_server: bool = True, is_print: bool = True) -> bool:
928        if not self.is_root:
929            return True
930        out = self.execute_shell_command("netstat -atn | grep :{}".format(port))
931        if is_print:
932            self.log.debug(out)
933        if out:
934            out = out.split("\n")
935            for data in out:
936                if check_server:
937                    if "LISTEN" in data and str(port) in data:
938                        return True
939                else:
940                    if "hdcd" in data and str(port) in data:
941                        return True
942        return False
943
944    def check_rpc_status(self, check_abc: bool = False, check_server: bool = True, check_times: int = 3) -> str:
945        port = self.d_port if not check_abc else self.abc_d_port
946        for i in range(check_times):
947            if self.is_harmony_rpc_running(check_abc):
948                break
949            else:
950                self.log.debug("check harmony rpc failed {} times, If is check bin(abc): {}, "
951                               "try to check again in 1 seconds".format(i + 1, check_abc))
952                time.sleep(1)
953        else:
954            self.log.debug(f"{check_times} times check failed.")
955            self.log.debug('Harmony rpc is not running!!!! If is check bin(abc): {}'.format(check_abc))
956            if check_abc:
957                return ErrorMessage.Device.Code_0303025.code
958            else:
959                return ErrorMessage.Device.Code_0303027.code
960
961        for i in range(check_times):
962            if self.is_harmony_rpc_socket_running(port, check_server=check_server):
963                break
964            else:
965                self.log.debug("Harmony rpc port is not find {} times, If is check bin(abc): {}, "
966                               "try to find again in 1 seconds".format(i + 1, check_abc))
967                time.sleep(1)
968        else:
969            self.log.debug('Harmony rpc port is not find!!!! If is check bin(abc): {}'.format(check_abc))
970            if check_abc:
971                return ErrorMessage.Device.Code_0303026.code
972            else:
973                return ErrorMessage.Device.Code_0303028.code
974        self.log.debug('Harmony rpc is running!!!! If is check abc: {}'.format(check_abc))
975        return SUCCESS_CODE
976
977    def call_proxy_listener(self):
978        if ((self.is_bin and self._abc_proxy) or
979                (not self.is_bin and self._proxy)):
980            if self.proxy_listener is not None:
981                self.proxy_listener(is_exception=True)
982        if self._proxy:
983            if self.win_proxy_listener is not None:
984                self.win_proxy_listener(is_exception=True)
985
986    def install_app(self, remote_path, command):
987        try:
988            ret = self.execute_shell_command(
989                "pm install %s %s" % (command, remote_path))
990            if ret is not None and str(
991                    ret) != "" and "Unknown option: -g" in str(ret):
992                return self.execute_shell_command(
993                    "pm install -r %s" % remote_path)
994            return ret
995        except Exception as error:
996            self.log.error("%s, maybe there has a warning box appears "
997                           "when installing RPC." % error)
998            return False
999
1000    def uninstall_app(self, package_name):
1001        try:
1002            ret = self.execute_shell_command("pm uninstall %s" % package_name)
1003            self.log.debug(ret)
1004            return ret
1005        except Exception as err:
1006            self.log.error('DeviceTest-20013 uninstall: %s' % str(err))
1007            return False
1008
1009    def check_need_install_bin(self):
1010        # check if agent.so exist
1011        if self._agent_mode == AgentMode.bin:
1012            ret = self.execute_shell_command("ls -l /data/local/tmp/agent.so")
1013        else:
1014            ret = self.execute_shell_command("ls -l /data/local/tmp/app.abc")
1015        LOG.debug(ret)
1016        if ret is None or "No such file or directory" in ret:
1017            return True
1018        return False
1019
1020    def reconnect(self, waittime=60, proxy=None):
1021        """
1022        @summary: Reconnect the device.
1023        """
1024        self.call_proxy_listener()
1025
1026        if not self.wait_for_boot_completion():
1027            if self._proxy:
1028                self._proxy.close()
1029            self._proxy = None
1030            if self._abc_proxy:
1031                self._abc_proxy.close()
1032            self._abc_proxy = None
1033            self._uitestdeamon = None
1034            self.remove_ports()
1035            raise Exception("Reconnect timed out.")
1036
1037        if not self.is_root and self._agent_mode == AgentMode.hap:
1038            LOG.debug("Reconnect device is not root, change hap mode to bin mode.")
1039            self._agent_mode = AgentMode.bin
1040
1041        if self._proxy and (proxy is None or proxy == AgentMode.hap):
1042            self.start_harmony_rpc(re_install_rpc=True, reconnect=True)
1043            self.fport_tcp_port(start_abc=False)
1044            try:
1045                self._proxy.init(port=self._h_port, addr=self.host, device=self)
1046            except Exception as _:
1047                time.sleep(3)
1048                self._proxy.init(port=self._h_port, addr=self.host, device=self)
1049
1050        if self.is_bin and self._abc_proxy and (proxy is None or proxy == AgentMode.bin):
1051            re_install = self.check_need_install_bin()
1052            self.start_abc_rpc(re_install_rpc=re_install, reconnect=True)
1053            self.fport_tcp_port(start_abc=True)
1054            try:
1055                self._abc_proxy.init(port=self._h_port, addr=self.host, device=self)
1056            except Exception as _:
1057                time.sleep(3)
1058                self._abc_proxy.init(port=self._h_port, addr=self.host, device=self)
1059
1060        if self._uitestdeamon is not None:
1061            self._uitestdeamon.init(self)
1062
1063        if self._proxy:
1064            return self._proxy
1065        return None
1066
1067    def fport_tcp_port(self, start_abc: bool = False) -> bool:
1068        filter_ports = []
1069        for i in range(3):
1070            host_port = self.get_local_port(start_abc=start_abc, filter_ports=filter_ports)
1071            remote_port = self.abc_d_port if start_abc else self.d_port
1072            cmd = "fport tcp:{} tcp:{}".format(host_port, remote_port)
1073            result = self.connector_command(cmd)
1074            if "Fail" not in result:
1075                self._h_port = host_port
1076                LOG.debug(f"hdc fport success, get_proxy host_port: {host_port}, remote_port: {remote_port}")
1077                return True
1078            filter_ports.append(host_port)
1079            LOG.debug(f"The {i + 1} time HDC fport tcp port fail.")
1080            from devicetest.utils.util import check_port_state
1081            check_port_state(host_port)
1082        else:
1083            err_msg = ErrorMessage.Device.Code_0303022
1084            LOG.error(err_msg)
1085            raise HDCFPortError(err_msg)
1086
1087    def get_local_port(self, start_abc: bool, filter_ports: list = None):
1088        if filter_ports is None:
1089            filter_ports = []
1090        from devicetest.utils.util import get_forward_port
1091        host = self.host
1092        port = None
1093        h_port = get_forward_port(self, host, port, filter_ports)
1094        if start_abc:
1095            self.remove_ports(normal=False)
1096            self.forward_ports_abc.append(h_port)
1097        else:
1098            self.remove_ports(abc=False)
1099            self.forward_ports.append(h_port)
1100        self.log.info("tcp forward port: {} for {}".format(
1101            h_port, convert_serial(self.device_sn)))
1102        return h_port
1103
1104    def remove_ports(self, abc: bool = True, normal: bool = True):
1105        if abc:
1106            for port in self.forward_ports_abc:
1107                cmd = "fport rm tcp:{} tcp:{}".format(
1108                    port, self.abc_d_port)
1109                self.connector_command(cmd)
1110            self.forward_ports_abc.clear()
1111        if normal:
1112            for port in self.forward_ports:
1113                cmd = "fport rm tcp:{} tcp:{}".format(
1114                    port, self.d_port)
1115                self.connector_command(cmd)
1116            self.forward_ports.clear()
1117
1118    def remove_history_ports(self, port):
1119        cmd = "fport ls"
1120        res = self.connector_command(cmd, is_print=False)
1121        res = res.split("\n")
1122        for data in res:
1123            if str(port) in data:
1124                data = data.split('\t')
1125                cmd = "fport rm {}".format(data[0][1:-1])
1126                self.connector_command(cmd, is_print=False)
1127
1128    def take_picture(self, name):
1129        """
1130        @summary: 截取手机屏幕图片并保存
1131        @param  name: 保存的图片名称,通过getTakePicturePath方法获取保存全路径
1132        """
1133        path = ""
1134        try:
1135            if self._device_report_path is None:
1136                from xdevice import EnvPool
1137                self._device_report_path = EnvPool.report_path
1138            temp_path = os.path.join(self._device_report_path, "temp")
1139            if not os.path.exists(temp_path):
1140                os.makedirs(temp_path)
1141            path = os.path.join(temp_path, name)
1142            picture_name = os.path.basename(name)
1143            out = self.execute_shell_command("snapshot_display -f /data/local/tmp/{}".format(picture_name))
1144            self.log.debug("result: {}".format(out))
1145            if "error" in out and "success" not in out:
1146                return False
1147            else:
1148                self.pull_file("/data/local/tmp/{}".format(picture_name), path)
1149        except Exception as error:
1150            self.log.error("devicetest take_picture: {}".format(str(error)))
1151        return path
1152
1153    def capture(self, link: str, path: str, ext: str = ".png") -> Tuple[str, str]:
1154        """
1155        截图步骤实现,未使用参数是保持一致
1156        :param link: 链接
1157        :param path: 保存路径
1158        :param ext: 后缀
1159        :return: link path 链接
1160        """
1161        remote = "/data/local/tmp/xdevice_screenshot{}".format(ext)
1162        new_ext = ".jpeg"
1163        link = link[:link.rfind(ext)] + new_ext
1164        path = path[:path.rfind(ext)] + new_ext
1165        remote = remote[:remote.rfind(ext)] + new_ext
1166        result = self.execute_shell_command("snapshot_display -f {}".format(remote), timeout=60000)
1167        LOG.debug("{}".format(result))
1168        # 适配非root
1169        if not self.is_root:
1170            time.sleep(1)
1171        self.pull_file(remote, path)
1172        self.execute_shell_command("rm -f {}".format(remote))
1173        return link, path
1174
1175    def set_device_report_path(self, path):
1176        self._device_report_path = path
1177
1178    def get_device_report_path(self):
1179        return self._device_report_path
1180
1181    def get_device_params(self, refresh=True):
1182        """
1183        获取设备属性信息
1184        @return:
1185        """
1186        if refresh:
1187            for key, value in self.device_params_command.items():
1188                if value and isinstance(value, str):
1189                    self.device_params[key] = self.get_property_value(value)
1190            self.device_params[DeviceProperties.sn] = self.device_sn
1191            try:
1192                result = self.execute_shell_command(
1193                    "snapshot_display -f /data/local/tmp/screen.png")
1194                if "success" not in result or "successfully" not in result:
1195                    result = self.execute_shell_command(
1196                        "snapshot_display -f /data/local/tmp/screen.jpeg")
1197                pattern = re.search(r"width \d+. height \d+", result)
1198                resolution = re.findall(r"\d+", pattern.group())
1199                self.device_params[DeviceProperties.xres] = resolution[0]
1200                self.device_params[DeviceProperties.yres] = resolution[1]
1201            except Exception as error:
1202                resolution = self.uitestdeamon.get_display_density()
1203                if resolution:
1204                    resolution = json.loads(resolution)
1205                    self.device_params[DeviceProperties.xres] = resolution.get("X",
1206                                                                               "")
1207                    self.device_params[DeviceProperties.yres] = resolution.get("Y",
1208                                                                               "")
1209        return copy.deepcopy(self.device_params)
1210
1211    def execute_shell_in_daemon(self, command):
1212        if self.host != "127.0.0.1":
1213            cmd = [HdcHelper.CONNECTOR_NAME, "-s", "{}:{}".format(
1214                self.host, self.port), "-t", self.device_sn, "shell"]
1215        else:
1216            cmd = [HdcHelper.CONNECTOR_NAME, "-t", self.device_sn, "shell"]
1217        LOG.debug("{} execute command {} {} in daemon".format(
1218            convert_serial(self.device_sn), HdcHelper.CONNECTOR_NAME, command))
1219        if isinstance(command, list):
1220            cmd.extend(command)
1221        else:
1222            command = command.strip()
1223            cmd.extend(command.split(" "))
1224        sys_type = platform.system()
1225        process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
1226                                   shell=False,
1227                                   preexec_fn=None if sys_type == "Windows"
1228                                   else os.setsid,
1229                                   close_fds=True)
1230        return process
1231
1232    def check_advance_option(self, extend_value, **kwargs):
1233        if not isinstance(extend_value, dict):
1234            return True
1235
1236        advance_dict = extend_value.get(AdvanceDeviceOption.advance, None)
1237        if not isinstance(advance_dict, dict):
1238            return True
1239        # 匹配设备别名
1240        adv_alias = advance_dict.get(DeviceProperties.alias, "")
1241        adv_label = advance_dict.get(AdvanceDeviceOption.label, "")
1242        alias = (adv_alias or adv_label).strip().upper()
1243        if alias:
1244            is_matched = False
1245            selection = "selection:{alias:%s}" % alias
1246            # 兼容-di参数
1247            device_info = kwargs.get("device_info", None)
1248            if device_info and isinstance(device_info, list):
1249                di_alias = ""
1250                for info in device_info:
1251                    if not isinstance(info, dict) or info.get("sn", "") != self.device_sn:
1252                        continue
1253                    di_alias = info.get("type", "")
1254                    is_matched = di_alias == alias
1255                    break
1256                if not is_matched:
1257                    LOG.error("device:{sn:%s, alias:%s} mismatch %s, please check "
1258                              "the [-di] running params!" % (self.device_sn, di_alias, selection))
1259                    LOG.info("current [-di] running params is: %s" % device_info)
1260                    return False
1261                self.device_id = di_alias
1262            elif self.device_id == alias:
1263                is_matched = True
1264            if not is_matched:
1265                LOG.error("device:{sn:%s, alias:%s} mismatch %s" % (
1266                    self.device_sn, self.device_id, selection))
1267                return False
1268
1269        # 匹配设备额外的信息
1270        advance_type = advance_dict.get(AdvanceDeviceOption.type, None)
1271        advance_product = advance_dict.get(AdvanceDeviceOption.product, None)
1272        advance_version = advance_dict.get(AdvanceDeviceOption.version, None)
1273        advance_product_cmd = advance_dict.get(AdvanceDeviceOption.product_cmd, None)
1274        advance_version_cmd = advance_dict.get(AdvanceDeviceOption.version_cmd, None)
1275        if advance_type and advance_type == AdvanceDeviceOption.command \
1276                and advance_product_cmd \
1277                and advance_version_cmd:
1278            if advance_product is not None:
1279                self.device_params[DeviceProperties.model] = \
1280                    self.execute_shell_command(advance_product_cmd).strip()
1281            if advance_version is not None:
1282                self.device_params[DeviceProperties.system_version] = \
1283                    self.execute_shell_command(advance_version_cmd).strip()
1284        else:
1285            if advance_product is not None:
1286                self.device_params[DeviceProperties.model] = \
1287                    self.get_property(self.device_params_command.get(DeviceProperties.model, ""))
1288            if advance_version is not None:
1289                self.device_params[DeviceProperties.system_version] = \
1290                    self.get_property(self.device_params_command.get(DeviceProperties.system_version, ""))
1291
1292        if advance_product and advance_version:
1293            return True if advance_product == self.device_params.get(DeviceProperties.model, "") \
1294                           and advance_version == self.device_params.get(DeviceProperties.system_version, "") else False
1295        elif advance_product and advance_version is None:
1296            return True if advance_product == self.device_params.get(DeviceProperties.model, "") else False
1297        elif advance_product is None and advance_version:
1298            return True if advance_version == self.device_params.get(DeviceProperties.system_version, "") else False
1299        else:
1300            return True
1301
1302    @property
1303    def webview(self):
1304        from devicetest.controllers.web.webview import WebView
1305        if self._webview is None:
1306            self._webview = WebView(self)
1307        return self._webview
1308
1309
1310class DeviceLogCollector:
1311    hilog_file_address = []
1312    log_file_address = []
1313    hdc_module_name = ""
1314    device = None
1315    restart_proc = []
1316    device_log_level = None
1317    is_clear = True
1318    device_hilog_proc = None
1319    need_pull_hdc_log = False  # 是否需要拉取hdc日志
1320
1321    # log
1322    hilog_file_pipes = []
1323    device_log = dict()
1324    hilog = dict()
1325    log_proc = dict()
1326    hilog_proc = dict()
1327
1328    _cur_thread_ident = None
1329    _cur_thread_name = None
1330    _hilog_begin_time = None
1331    _latest_pull_abnormal_log_time = time.time()
1332
1333    def __init__(self, device):
1334        self.device = device
1335
1336    def restart_catch_device_log(self):
1337        self._sync_device_time()
1338        for _, path in enumerate(self.hilog_file_address):
1339            hilog_open = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
1340                                 FilePermission.mode_755)
1341            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
1342                _, proc = self.start_catch_device_log(hilog_file_pipe=hilog_file_pipe)
1343                self.restart_proc.append(proc)
1344
1345    def stop_restart_catch_device_log(self):
1346        # when device free stop restart log proc
1347        for _, proc in enumerate(self.restart_proc):
1348            self.stop_catch_device_log(proc)
1349        self.restart_proc.clear()
1350        self.hilog_file_address.clear()
1351        self.log_file_address.clear()
1352
1353    def _set_device_log_level(self, **kwargs):
1354        # 设备日志级别
1355        if not self.device_log_level:
1356            log_level = kwargs.get("log_level", "INFO")
1357            if log_level not in LOGLEVEL:
1358                self.device_log_level = "INFO"
1359            else:
1360                self.device_log_level = log_level
1361        cmd = "hilog -b {}".format(self.device_log_level)
1362        self.device.execute_shell_command(cmd)
1363
1364    def _set_hilog_begin_time(self):
1365        """设置日志抓取任务的开始时间"""
1366        cur_thread = threading.current_thread()
1367        cur_thread_id, cur_thread_name = cur_thread.ident, cur_thread.name
1368        if self._cur_thread_ident != cur_thread_id or self._cur_thread_name != cur_thread_name:
1369            # 用例连续运行,执行线程会变换,这时更新线程id和开始时间
1370            self._cur_thread_ident, self._cur_thread_name = cur_thread_id, cur_thread_name
1371            self._hilog_begin_time = time.time()
1372
1373    def start_catch_device_log(self, log_file_pipe=None, hilog_file_pipe=None, **kwargs):
1374        """
1375        Starts hdc log for each device in separate subprocesses and save
1376        the logs in files.
1377        """
1378        self._sync_device_time()
1379        self._set_device_log_level(**kwargs)
1380        self._set_hilog_begin_time()
1381
1382        device_hilog_proc = None
1383        if hilog_file_pipe:
1384            command = "hilog"
1385            if self.device.host != "127.0.0.1":
1386                cmd = [HdcHelper.CONNECTOR_NAME, "-s", "{}:{}".format(self.device.host, self.device.port),
1387                       "-t", self.device.device_sn, "shell", command]
1388            else:
1389                cmd = [HdcHelper.CONNECTOR_NAME, "-t", self.device.device_sn, "shell", command]
1390            LOG.info("execute command: %s" % " ".join(cmd).replace(
1391                self.device.device_sn, convert_serial(self.device.device_sn)))
1392            device_hilog_proc = start_standing_subprocess(
1393                cmd, hilog_file_pipe)
1394        self.device_hilog_proc = device_hilog_proc
1395        return None, device_hilog_proc
1396
1397    def stop_catch_device_log(self, proc):
1398        """
1399        Stops all hdc log subprocesses.
1400        """
1401        if proc:
1402            self.device.log.debug("Stop catch device hilog.")
1403            stop_standing_subprocess(proc)
1404        if self.hdc_module_name:
1405            self.pull_hdc_log(self.hdc_module_name)
1406            self.hdc_module_name = None
1407
1408    def start_hilog_task(self, **kwargs):
1409        """启动日志抓取任务。若设备没有在抓取日志,则设置启动抓取(不删除历史日志,以免影响其他组件运行)"""
1410        log_size = kwargs.get("log_size", "10M").upper()
1411        if re.search("^[0-9]+[K?]$", log_size) is None \
1412                and re.search("^[0-9]+[M?]$", log_size) is None:
1413            self.device.log.debug("hilog task Invalid size string {}. Use default 10M".format(log_size))
1414            log_size = "10M"
1415        matcher = re.match("^[0-9]+", log_size)
1416        if log_size.endswith("K") and int(matcher.group(0)) < 64:
1417            self.device.log.debug("hilog task file size should be "
1418                                  "in range [64.0K, 512.0M], use min value 64K, now is {}".format(log_size))
1419            log_size = "64K"
1420        if log_size.endswith("M") and int(matcher.group(0)) > 512:
1421            self.device.log.debug("hilog task file size should be "
1422                                  "in range [64.0K, 512.0M], use min value 512M, now is {}".format(log_size))
1423            log_size = "512M"
1424
1425        self._sync_device_time()
1426        self._set_device_log_level(**kwargs)
1427        self._set_hilog_begin_time()
1428
1429        # 启动日志任务
1430        out = self.device.execute_shell_command('hilog -w query')
1431        LOG.debug(out)
1432        if 'No running persistent task' in out:
1433            # 启动hilog日志任务
1434            self.device.execute_shell_command('hilog -w start -l {} -n 1000'.format(log_size))
1435        if 'kmsg' not in out:
1436            # 启动kmsg日志任务
1437            self.device.execute_shell_command('hilog -w start -t kmsg -l {} -n 1000'.format(log_size))
1438
1439    def stop_hilog_task(self, log_name, repeat=1, repeat_round=1, **kwargs):
1440        module_name = kwargs.get("module_name", "")
1441        round_folder = f"round{repeat_round}" if repeat > 1 else ""
1442        base_dir = os.path.join(self.device.get_device_report_path(), "log", round_folder)
1443        if module_name:
1444            path = os.path.join(base_dir, module_name)
1445        else:
1446            path = base_dir
1447        os.makedirs(path, exist_ok=True)
1448
1449        # 获取hilog日志
1450        hilog_local = os.path.join(path, "hilog_{}".format(log_name))
1451        self.get_period_log({HILOG_PATH: ""}, hilog_local)
1452        # 拉取最新的字典文件。若hilog都没拉出来,字典文件也不用拉取了
1453        if os.path.exists(hilog_local):
1454            out = self.device.execute_shell_command('ls -t {} | grep hilog_dict'.format(HILOG_PATH))
1455            LOG.debug(out)
1456            log_dicts = out.strip().replace('\r', '').split('\n') if out else []
1457            if log_dicts:
1458                self.device.pull_file(HILOG_PATH + '/' + log_dicts[0], hilog_local, retry=0)
1459            else:
1460                LOG.warning("hilog_dict does not exist, and it won't be pulled")
1461
1462        # 获取crash日志
1463        self.start_get_crash_log(log_name, repeat=repeat, repeat_round=repeat_round, module_name=module_name)
1464        # 获取额外路径的日志
1465        extras_dirs = kwargs.get("extras_dirs", "")
1466        self.pull_extra_log_files(log_name, module_name, extras_dirs, round_folder=round_folder)
1467        # 获取hdc日志
1468        self.pull_hdc_log(module_name, round_folder=round_folder)
1469
1470    def pull_hdc_log(self, module_name, round_folder=""):
1471        if not self.need_pull_hdc_log:
1472            return
1473        report_path = self.device.get_device_report_path()
1474        if not report_path:
1475            return
1476        hdc_log_save_path = os.path.join(
1477            report_path, "log", round_folder, module_name, "hdc_log")
1478        if not os.path.exists(hdc_log_save_path):
1479            os.makedirs(hdc_log_save_path)
1480        temp_dir = tempfile.gettempdir()
1481        files = os.listdir(temp_dir)
1482        for file in files:
1483            if "hdc.log" in file or "hdclast.log" in file:
1484                hdc_log = os.path.join(temp_dir, file)
1485                shutil.copy(hdc_log, hdc_log_save_path)
1486
1487    def start_get_crash_log(self, task_name, repeat=1, repeat_round=1, **kwargs):
1488        self._set_hilog_begin_time()
1489        module_name = kwargs.get("module_name", "")
1490        round_folder = f"round{repeat_round}" if repeat > 1 else ""
1491        base_dir = os.path.join(self.device.get_device_report_path(), "log", round_folder)
1492        crash_folder = f"crash_log_{task_name}"
1493        if module_name:
1494            crash_path = os.path.join(base_dir, module_name, crash_folder)
1495        else:
1496            crash_path = os.path.join(base_dir, crash_folder)
1497
1498        crash_logs = {
1499            NATIVE_CRASH_PATH: ["cppcrash"],
1500            # JS_CRASH_PATH设为空,表示拉取这个路径下用例运行期间生成的文件
1501            JS_CRASH_PATH: [],
1502            ROOT_PATH: ["SERVICE_BLOCK", "appfreeze"]
1503        }
1504        remotes = {}
1505        for base_path, folders in crash_logs.items():
1506            for folder in folders:
1507                remote_dir = base_path + '/' + folder if folder else base_path
1508                remotes.update({remote_dir: ""})
1509            else:
1510                remotes.update({base_path: ""})
1511        self.get_period_log(remotes, crash_path)
1512
1513    def clear_crash_log(self):
1514        warnings.warn('this function is no longer supported', DeprecationWarning)
1515
1516    def _sync_device_time(self):
1517        # 先同步PC和设备的时间
1518        iso_time_format = '%Y-%m-%d %H:%M:%S'
1519        cur_time = get_cst_time().strftime(iso_time_format)
1520        self.device.execute_shell_command("date '{}'".format(cur_time))
1521
1522    def add_log_address(self, log_file_address, hilog_file_address):
1523        # record to restart catch log when reboot device
1524        if log_file_address:
1525            self.log_file_address.append(log_file_address)
1526        if hilog_file_address:
1527            self.hilog_file_address.append(hilog_file_address)
1528            self.hdc_module_name = os.path.basename(os.path.dirname(hilog_file_address))
1529
1530    def remove_log_address(self, log_file_address, hilog_file_address):
1531        if log_file_address and log_file_address in self.log_file_address:
1532            self.log_file_address.remove(log_file_address)
1533        if hilog_file_address and hilog_file_address in self.hilog_file_address:
1534            self.hilog_file_address.remove(hilog_file_address)
1535
1536    def pull_extra_log_files(self, task_name: str, module_name: str, dirs: str, round_folder: str = ""):
1537        if not dirs or dirs == 'None':
1538            return
1539        extra_log_path = os.path.join(
1540            self.device.get_device_report_path(), "log", round_folder,
1541            module_name, "extra_log_{}".format(task_name))
1542        remotes = {}
1543        for item in dirs.split(';'):
1544            item = item.strip().rstrip('/')
1545            if not item:
1546                continue
1547            # 若是文件夹,则保存在本地的同名文件夹内
1548            on_folder = os.path.basename(item) if self.device.is_directory(item) else ""
1549            remotes.update({item: on_folder})
1550        self.get_period_log(remotes, extra_log_path)
1551
1552    def clear_device_logs(self):
1553        """清除设备侧日志"""
1554        warnings.warn('this function is no longer supported', DeprecationWarning)
1555
1556    def clear_kingking_dir_log(self):
1557        def execute_clear_cmd(path: str, prefix: list):
1558            for pre in prefix:
1559                clear_cmd = "rm -f {}/{}/*".format(path, pre)
1560                self.device.execute_shell_command(clear_cmd)
1561
1562        execute_clear_cmd(KINGKONG_PATH, ["data", "fault_route", "screenshots"])
1563
1564    def get_abnormal_hilog(self, local_hilog_path):
1565        warnings.warn('this function is no longer supported', DeprecationWarning)
1566
1567    def get_period_log(self, remotes: dict, local_path: str, begin_time: float = None, find_cmd: str = None):
1568        """在目录下查找一段时间内有更改的文件,并将文件拉到本地
1569        remotes: dict, {查找目录: 使用子文件夹存放文件(通常不用子文件夹)}
1570        local_path: str, pull to local path
1571        begin_time: float, the beginning time
1572        """
1573        begin = begin_time if begin_time else self._hilog_begin_time
1574        if not begin:
1575            LOG.warning('hilog task begin time is not set')
1576            return
1577        minutes, seconds = divmod(int(time.time() - begin), 60)
1578        if minutes < 0:
1579            LOG.warning('get logs in a period failed!')
1580            LOG.warning('当前日志打印的时间先与开始抓取日志的时间')
1581            return
1582        if minutes > 0:
1583            units = '%dm' % minutes
1584        else:
1585            units = '%ds' % seconds
1586
1587        for remote_dir, on_folder in remotes.items():
1588            find = find_cmd if find_cmd else 'find {}'.format(remote_dir)
1589            cmd = '{} -type f -mtime -{}'.format(find, units)
1590            out = self.device.execute_shell_command(cmd)
1591            if 'No such file or directory' in out:
1592                continue
1593            LOG.debug(out)
1594            log_files = [f for f in out.strip().replace('\r', '').split('\n') if f and f.startswith(remote_dir)]
1595            if not log_files:
1596                continue
1597            local_dir = os.path.join(local_path, on_folder) if on_folder else local_path
1598            os.makedirs(local_dir, exist_ok=True)
1599            os.chmod(local_dir, FilePermission.mode_755)
1600            for log_file in log_files:
1601                # 避免将整个文件夹拉下来和重复拉取文件
1602                if log_file == remote_dir and self.device.is_directory(log_file) \
1603                        or os.path.exists(log_file) and os.path.isfile(log_file):
1604                    continue
1605                self.device.pull_file(log_file, local_dir, retry=0)
1606
1607    def start_catch_log(self, request, **kwargs):
1608        hilog_size = kwargs.get("hilog_size", "10M")
1609        log_level = request.config.device_log.get(ConfigConst.tag_loglevel, "INFO")
1610        pull_hdc_log_status = request.config.device_log.get(ConfigConst.tag_hdc, None)
1611        self.need_pull_hdc_log = False if pull_hdc_log_status and pull_hdc_log_status.lower() == "false" else True
1612        self.device.set_device_report_path(request.config.report_path)
1613        self.start_hilog_task(log_size=hilog_size, log_level=log_level)
1614
1615    def stop_catch_log(self, request, **kwargs):
1616        self.remove_log_address(self.device_log.get(self.device.device_sn, None),
1617                                self.hilog.get(self.device.device_sn, None))
1618        serial = "{}_{}".format(str(self.device.__get_serial__()), time.time_ns())
1619        log_tar_file_name = "{}".format(str(serial).replace(":", "_"))
1620        self.stop_hilog_task(
1621            log_tar_file_name,
1622            module_name=request.get_module_name(),
1623            extras_dirs=request.config.device_log.get(ConfigConst.tag_dir),
1624            repeat=request.config.repeat,
1625            repeat_round=request.get_repeat_round())
1626