• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import json
19import re
20import shutil
21import time
22import os
23import threading
24import copy
25import platform
26import subprocess
27import sys
28import tempfile
29import warnings
30from typing import Tuple
31from xml.etree import ElementTree
32
33from xdevice import DeviceOsType
34from xdevice import Variables
35from xdevice import FilePermission
36from xdevice import ParamError
37from xdevice import ProductForm
38from xdevice import ReportException
39from xdevice import IDevice
40from xdevice import platform_logger
41from xdevice import Plugin
42from xdevice import exec_cmd
43from xdevice import ConfigConst
44from xdevice import HdcError
45from xdevice import DeviceAllocationState
46from xdevice import DeviceConnectorType
47from xdevice import TestDeviceState
48from xdevice import AdvanceDeviceOption
49from xdevice import convert_serial
50from xdevice import check_path_legal
51from xdevice import start_standing_subprocess
52from xdevice import stop_standing_subprocess
53from xdevice import DeviceProperties
54from xdevice import get_cst_time
55from xdevice import get_file_absolute_path
56from xdevice import Platform
57from xdevice import AppInstallError
58from xdevice import AgentMode
59from xdevice import check_uitest_version
60from xdevice import ShellCommandUnresponsiveException
61from ohos.environment.dmlib import HdcHelper
62from ohos.environment.dmlib import CollectingOutputReceiver
63from ohos.utils import parse_strings_key_value
64from ohos.error import ErrorMessage
65from ohos.exception import OHOSRpcNotRunningError
66from ohos.exception import OHOSDeveloperModeNotTrueError
67from ohos.exception import OHOSRpcProcessNotFindError
68from ohos.exception import OHOSRpcPortNotFindError
69from ohos.exception import OHOSRpcStartFailedError
70from ohos.exception import HDCFPortError
71
72__all__ = ["Device"]
73TIMEOUT = 300 * 1000
74RETRY_ATTEMPTS = 2
75DEFAULT_UNAVAILABLE_TIMEOUT = 20 * 1000
76BACKGROUND_TIME = 2 * 60 * 1000
77LOG = platform_logger("Device")
78DEVICETEST_HAP_PACKAGE_NAME = "com.ohos.devicetest"
79DEVICE_TEMP_PATH = "/data/local/tmp"
80QUERY_DEVICE_PROP_BIN = "testcases/queryStandard"
81UITEST_NAME = "uitest"
82UITEST_SINGLENESS = "singleness"
83EXTENSION_NAME = "--extension-name"
84UITEST_PATH = "/system/bin/uitest"
85UITEST_SHMF = "/data/app/el2/100/base/{}/cache/shmf".format(DEVICETEST_HAP_PACKAGE_NAME)
86UITEST_COMMAND = "{} start-daemon 0123456789".format(UITEST_PATH)
87NATIVE_CRASH_PATH = "/data/log/faultlog/temp"
88JS_CRASH_PATH = "/data/log/faultlog/faultlogger"
89ROOT_PATH = "/data/log/faultlog"
90KINGKONG_PATH = "/data/local/tmp/kingkongDir"
91LOGLEVEL = ["DEBUG", "INFO", "WARN", "ERROR", "FATAL"]
92HILOG_PATH = "/data/log/hilog"
93SUCCESS_CODE = "0"
94
95
96def perform_device_action(func):
97    def callback_to_outer(device, msg):
98        # callback to decc ui
99        if getattr(device, "callback_method", None):
100            device.callback_method(msg)
101
102    def device_action(self, *args, **kwargs):
103        if not self.get_recover_state():
104            LOG.debug("Device {} {} is false".format(self.device_sn,
105                                                     ConfigConst.recover_state))
106            return None
107        # avoid infinite recursion, such as device reboot
108        abort_on_exception = bool(kwargs.get("abort_on_exception", False))
109        if abort_on_exception:
110            result = func(self, *args, **kwargs)
111            return result
112
113        tmp = int(kwargs.get("retry", RETRY_ATTEMPTS))
114        retry = tmp + 1 if tmp > 0 else 1
115        exception = None
116        for _ in range(retry):
117            try:
118                result = func(self, *args, **kwargs)
119                return result
120            except ReportException as error:
121                self.log.exception("Generate report error!", exc_info=False)
122                exception = error
123            except (ConnectionResetError,  # pylint:disable=undefined-variable
124                    ConnectionRefusedError,  # pylint:disable=undefined-variable
125                    ConnectionAbortedError) as error:  # pylint:disable=undefined-variable
126                self.log.error("error type: {}, error: {}".format
127                               (error.__class__.__name__, error))
128                # check hdc if is running
129                if not HdcHelper.check_if_hdc_running():
130                    LOG.debug("{} not running, set device {} {} false".format(
131                        HdcHelper.CONNECTOR_NAME, self.device_sn, ConfigConst.recover_state))
132                    self.set_recover_state(False)
133                    callback_to_outer(self, "recover failed")
134                    raise error
135                callback_to_outer(self, "error:{}, prepare to recover".format(error))
136                if not self.recover_device():
137                    LOG.debug("Set device {} {} false".format(
138                        self.device_sn, ConfigConst.recover_state))
139                    self.set_recover_state(False)
140                    callback_to_outer(self, "recover failed")
141                    raise error
142                exception = error
143                callback_to_outer(self, "recover success")
144            except HdcError as error:
145                self.log.error("error type: {}, error: {}".format(error.__class__.__name__, error))
146                callback_to_outer(self, "error:{}, prepare to recover".format(error))
147                if not self.recover_device():
148                    LOG.debug("Set device {} {} false".format(
149                        self.device_sn, ConfigConst.recover_state))
150                    self.set_recover_state(False)
151                    callback_to_outer(self, "recover failed")
152                    raise error
153                exception = error
154                callback_to_outer(self, "recover success")
155            except Exception as error:
156                self.log.exception("error type: {}, error: {}".format(
157                    error.__class__.__name__, error), exc_info=False)
158                exception = error
159        raise exception
160
161    return device_action
162
163
164@Plugin(type=Plugin.DEVICE, id=DeviceOsType.default)
165class Device(IDevice):
166    """
167    Class representing a device.
168
169    Each object of this class represents one device in xDevice,
170    including handles to hdc, fastboot, and test agent (DeviceTest.apk).
171
172    Attributes:
173        device_sn: A string that's the serial number of the device.
174    """
175
176    device_sn = None
177    host = None
178    port = None
179    usb_type = DeviceConnectorType.hdc
180    is_timeout = False
181    device_hilog_proc = None
182    device_os_type = DeviceOsType.default
183    test_device_state = None
184    device_allocation_state = DeviceAllocationState.available
185    label = ProductForm.phone
186    log = platform_logger("Device")
187    device_state_monitor = None
188    reboot_timeout = 2 * 60 * 1000
189    _device_log_collector = None
190
191    _proxy = None
192    _abc_proxy = None
193    _agent_mode = AgentMode.bin
194    initdevice = True
195    d_port = 8011
196    abc_d_port = 8012
197    _uitestdeamon = None
198    rpc_timeout = 300
199    device_id = None
200    reconnecttimes = 0
201    _h_port = None
202    oh_module_package = None
203    module_ablity_name = None
204    _device_report_path = None
205    test_platform = Platform.ohos
206    _webview = None
207    _is_root = None
208
209    model_dict = {
210        'default': ProductForm.phone,
211        'phone': ProductForm.phone,
212        'car': ProductForm.car,
213        'tv': ProductForm.television,
214        'watch': ProductForm.watch,
215        'wearable': ProductForm.wearable,
216        'tablet': ProductForm.tablet,
217        '2in1': ProductForm._2in1,
218        'nosdcard': ProductForm.phone
219    }
220
221    device_params = {
222        DeviceProperties.system_sdk: "",
223        DeviceProperties.system_version: "",
224        DeviceProperties.build_number: "",
225        DeviceProperties.cpu_abi: "",
226        DeviceProperties.device_form: "PHYSICAL",
227        DeviceProperties.software_version: "",
228        DeviceProperties.fault_code: "",
229        DeviceProperties.fold_screen: "",
230        DeviceProperties.hardware: "",
231        DeviceProperties.is_ark: "",
232        DeviceProperties.mac: "",
233        DeviceProperties.mobile_service: "",
234        DeviceProperties.model: "",
235        DeviceProperties.rom: "",
236        DeviceProperties.rooted: "",
237        DeviceProperties.sn: "",
238        DeviceProperties.xres: "",
239        DeviceProperties.yres: "",
240        DeviceProperties.manufacturer: "",
241        DeviceProperties.kind: 2
242    }
243
244    device_params_command = {
245        DeviceProperties.system_sdk: "const.ohos.apiversion",
246        DeviceProperties.system_version: "",
247        DeviceProperties.build_number: "",
248        DeviceProperties.cpu_abi: "const.product.cpu.abilist",
249        DeviceProperties.device_form: "",
250        DeviceProperties.software_version: "const.product.software.version",
251        DeviceProperties.fault_code: "",
252        DeviceProperties.fold_screen: "",
253        DeviceProperties.hardware: "ohos.boot.hardware",
254        DeviceProperties.is_ark: "",
255        DeviceProperties.mac: "",
256        DeviceProperties.mobile_service: "ro.odm.config.modem_number",
257        DeviceProperties.model: "ohos.boot.hardware",
258        DeviceProperties.rom: "",
259        DeviceProperties.rooted: "",
260        DeviceProperties.xres: "",
261        DeviceProperties.yres: "",
262        DeviceProperties.manufacturer: "const.product.manufacturer",
263        DeviceProperties.kind: ""
264    }
265
266    def __init__(self):
267        self.extend_value = {}
268        self.device_lock = threading.RLock()
269        self.forward_ports = []
270        self.forward_ports_abc = []
271        self.proxy_listener = None
272        self.win_proxy_listener = None
273        self.device_props = {}
274        self.device_description = {}
275
276    def __eq__(self, other):
277        return self.device_sn == other.__get_serial__() and \
278            self.device_os_type == other.device_os_type and \
279            self.host == other.host
280
281    def init_description(self):
282        if self.device_description:
283            return
284        desc = {
285            DeviceProperties.sn: convert_serial(self.device_sn),
286            DeviceProperties.model: self.get_property_value("const.product.model"),
287            DeviceProperties.type_: self.get_device_type(),
288            DeviceProperties.platform: self._get_device_platform(),
289            DeviceProperties.version: self.get_property_value(
290                self.device_params_command.get(DeviceProperties.software_version)),
291            DeviceProperties.others: self.device_props
292        }
293        self.device_description.update(desc)
294
295    def __set_serial__(self, device_sn=""):
296        self.device_sn = device_sn
297        return self.device_sn
298
299    def __get_serial__(self):
300        return self.device_sn
301
302    def extend_device_props(self):
303        if self.device_props:
304            return
305        try:
306            query_bin_path = get_file_absolute_path(QUERY_DEVICE_PROP_BIN)
307        except ParamError:
308            query_bin_path = ""
309        if query_bin_path == "":
310            return
311        self.push_file(query_bin_path, DEVICE_TEMP_PATH)
312        file_name = os.path.basename(query_bin_path)
313        cmd = f"cd {DEVICE_TEMP_PATH} && chmod +x {file_name} && ./{file_name}"
314        out = self.execute_shell_command(
315            cmd, timeout=5 * 1000, output_flag=False, retry=RETRY_ATTEMPTS, abort_on_exception=False).strip()
316        if not out:
317            return
318        LOG.info(out)
319        params = parse_strings_key_value(out)
320        self.device_props.update(params)
321
322    def get(self, key=None, default=None):
323        if not key:
324            return default
325        value = getattr(self, key, None)
326        if value:
327            return value
328        else:
329            return self.extend_value.get(key, default)
330
331    def recover_device(self):
332        if not self.get_recover_state():
333            LOG.debug("Device %s %s is false, cannot recover device" % (
334                self.device_sn, ConfigConst.recover_state))
335            return False
336
337        result = self.device_state_monitor.wait_for_device_available(self.reboot_timeout)
338        if result:
339            self.device_log_collector.restart_catch_device_log()
340        return result
341
342    def _get_device_platform(self):
343        self.test_platform = "OpenHarmony"
344        return self.test_platform
345
346    def get_device_type(self):
347        try:
348            model = self.get_property("const.product.devicetype",
349                                      abort_on_exception=True)
350        except ShellCommandUnresponsiveException:
351            model = "default"
352        model = "default" if model == "" else model
353        self.label = self.model_dict.get(model, ProductForm.phone)
354        return self.label
355
356    def get_property(self, prop_name, retry=RETRY_ATTEMPTS,
357                     abort_on_exception=False):
358        """
359        Hdc command, ddmlib function.
360        """
361        if not self.get_recover_state():
362            return ""
363        command = "param get %s" % prop_name
364        stdout = self.execute_shell_command(command, timeout=5 * 1000,
365                                            output_flag=False,
366                                            retry=retry,
367                                            abort_on_exception=abort_on_exception).strip()
368        if stdout:
369            LOG.debug(stdout)
370        return stdout
371
372    def get_property_value(self, prop_name, retry=RETRY_ATTEMPTS,
373                           abort_on_exception=False):
374        """
375        Hdc command, ddmlib function.
376        """
377        if not self.get_recover_state():
378            return ""
379        command = "param get %s" % prop_name
380        stdout = self.execute_shell_command(command, timeout=5 * 1000,
381                                            output_flag=False,
382                                            retry=retry,
383                                            abort_on_exception=abort_on_exception).strip()
384        if "fail" in stdout:
385            return ""
386        return stdout
387
388    @perform_device_action
389    def connector_command(self, command, **kwargs):
390        timeout = int(kwargs.get("timeout", TIMEOUT)) / 1000
391        error_print = bool(kwargs.get("error_print", True))
392        join_result = bool(kwargs.get("join_result", False))
393        timeout_msg = '' if timeout == 300.0 else \
394            " with timeout %ss" % timeout
395        if self.host != "127.0.0.1":
396            cmd = [HdcHelper.CONNECTOR_NAME, "-s", "{}:{}".format(self.host, self.port), "-t", self.device_sn]
397        else:
398            cmd = [HdcHelper.CONNECTOR_NAME, "-t", self.device_sn]
399        LOG.debug("{} execute command {} {} {}".format(convert_serial(self.device_sn),
400                                                       HdcHelper.CONNECTOR_NAME,
401                                                       command, timeout_msg))
402        if isinstance(command, list):
403            cmd.extend(command)
404        else:
405            command = command.strip()
406            cmd.extend(command.split(" "))
407        result = exec_cmd(cmd, timeout, error_print, join_result)
408        if not result:
409            return result
410        is_print = bool(kwargs.get("is_print", True))
411        if is_print:
412            for line in str(result).split("\n"):
413                if line.strip():
414                    LOG.debug(line.strip())
415        return result
416
417    @perform_device_action
418    def execute_shell_command(self, command, timeout=TIMEOUT,
419                              receiver=None, **kwargs):
420        if not receiver:
421            collect_receiver = CollectingOutputReceiver()
422            HdcHelper.execute_shell_command(
423                self, command, timeout=timeout,
424                receiver=collect_receiver, **kwargs)
425            return collect_receiver.output
426        else:
427            return HdcHelper.execute_shell_command(
428                self, command, timeout=timeout,
429                receiver=receiver, **kwargs)
430
431    def execute_shell_cmd_background(self, command, timeout=TIMEOUT,
432                                     receiver=None):
433        status = HdcHelper.execute_shell_command(self, command,
434                                                 timeout=timeout,
435                                                 receiver=receiver)
436
437        self.wait_for_device_not_available(DEFAULT_UNAVAILABLE_TIMEOUT)
438        self.device_state_monitor.wait_for_device_available(BACKGROUND_TIME)
439        cmd = "target mount"
440        self.connector_command(cmd)
441        self.device_log_collector.restart_catch_device_log()
442        return status
443
444    def wait_for_device_not_available(self, wait_time):
445        return self.device_state_monitor.wait_for_device_not_available(
446            wait_time)
447
448    def _wait_for_device_online(self, wait_time=None):
449        return self.device_state_monitor.wait_for_device_online(wait_time)
450
451    def _do_reboot(self):
452        HdcHelper.reboot(self)
453        self.wait_for_boot_completion()
454
455    def _reboot_until_online(self):
456        self._do_reboot()
457
458    def reboot(self):
459        self._reboot_until_online()
460        self.device_log_collector.restart_catch_device_log()
461
462    @perform_device_action
463    def install_package(self, package_path, command=""):
464        if package_path is None:
465            raise HdcError(ErrorMessage.Device.Code_0303005)
466        return HdcHelper.install_package(self, package_path, command)
467
468    @perform_device_action
469    def uninstall_package(self, package_name):
470        return HdcHelper.uninstall_package(self, package_name)
471
472    @perform_device_action
473    def push_file(self, local, remote, **kwargs):
474        """
475        Push a single file.
476        The top directory won't be created if is_create is False (by default)
477        and vice versa
478        """
479        local = "\"{}\"".format(local)
480        remote = "\"{}\"".format(remote)
481        if local is None:
482            raise HdcError(ErrorMessage.Device.Code_0303001)
483
484        remote_is_dir = kwargs.get("remote_is_dir", False)
485        if remote_is_dir:
486            ret = self.execute_shell_command("test -d %s && echo 0" % remote, retry=0)
487            if not (ret != "" and len(str(ret).split()) != 0 and
488                    str(ret).split()[0] == "0"):
489                self.execute_shell_command("mkdir -p %s" % remote, retry=0)
490
491        if self.host != "127.0.0.1":
492            self.connector_command("file send {} {}".format(local, remote), retry=0)
493        else:
494            is_create = kwargs.get("is_create", False)
495            timeout = kwargs.get("timeout", TIMEOUT)
496            HdcHelper.push_file(self, local, remote, is_create=is_create,
497                                timeout=timeout)
498            if not self.is_file_exist(remote):
499                self.connector_command("file send {} {}".format(local, remote), retry=0)
500        if not self.is_file_exist(remote):
501            err_msg = ErrorMessage.Device.Code_0303004.format(local, remote)
502            LOG.error(err_msg)
503            raise HdcError(err_msg)
504
505    @perform_device_action
506    def pull_file(self, remote, local, **kwargs):
507        """
508        Pull a single file.
509        The top directory won't be created if is_create is False (by default)
510        and vice versa
511        """
512        local = "\"{}\"".format(local)
513        remote = "\"{}\"".format(remote)
514        self.connector_command("file recv {} {}".format(remote, local), retry=0)
515
516    @property
517    def is_root(self):
518        if self._is_root is None:
519            ret = self.execute_shell_command("whoami")
520            LOG.debug(ret)
521            self._is_root = True if "root" in ret else False
522        return self._is_root
523
524    def is_directory(self, path):
525        path = check_path_legal(path)
526        output = self.execute_shell_command("ls -ld {}".format(path))
527        if output and output.startswith('d'):
528            return True
529        return False
530
531    def is_file_exist(self, file_path):
532        file_path = check_path_legal(file_path)
533        output = self.execute_shell_command("ls {}".format(file_path))
534        if output and "No such file or directory" not in output:
535            return True
536        return False
537
538    def get_recover_result(self, retry=RETRY_ATTEMPTS):
539        command = "param get bootevent.boot.completed"
540        stdout = self.execute_shell_command(command, timeout=5 * 1000,
541                                            output_flag=False, retry=retry,
542                                            abort_on_exception=True).strip()
543        LOG.debug("device recover status: {}".format(stdout))
544        return stdout
545
546    def set_recover_state(self, state):
547        with self.device_lock:
548            setattr(self, ConfigConst.recover_state, state)
549            if not state:
550                self.test_device_state = TestDeviceState.NOT_AVAILABLE
551                self.device_allocation_state = DeviceAllocationState.unavailable
552                self.call_proxy_listener()
553
554    def get_recover_state(self, default_state=True):
555        with self.device_lock:
556            state = getattr(self, ConfigConst.recover_state, default_state)
557            return state
558
559    def wait_for_boot_completion(self):
560        """Waits for the device to boot up.
561
562        Returns:
563            True if the device successfully finished booting, False otherwise.
564        """
565        return self.device_state_monitor.wait_for_boot_complete(self.reboot_timeout)
566
567    @classmethod
568    def check_recover_result(cls, recover_result):
569        return "true" in recover_result
570
571    @property
572    def device_log_collector(self):
573        if self._device_log_collector is None:
574            self._device_log_collector = DeviceLogCollector(self)
575        return self._device_log_collector
576
577    def close(self):
578        self.reconnecttimes = 0
579        try:
580            from devicetest.controllers.tools.recorder.record_agent import RecordAgent
581            if RecordAgent.instance:
582                RecordAgent.instance.terminate()
583        except Exception as error:
584            self.log.error(' RecordAgent terminate error: {}.'.format(str(error)))
585
586    def reset(self):
587        self.log.debug("start reset device...")
588        self.call_proxy_listener()
589        if self._proxy is not None:
590            self._proxy.close()
591        self._proxy = None
592        if self._uitestdeamon is not None:
593            self._uitestdeamon = None
594        if self.is_bin and not self.kill_uitest:
595            self.stop_harmony_rpc(kill_uitest=False)
596        else:
597            self.stop_harmony_rpc()
598        self.remove_ports()
599        self.device_log_collector.stop_restart_catch_device_log()
600
601    @property
602    def kill_uitest(self):
603        task_args = Variables.config.taskargs
604        return task_args.get("kill_uitest", "").lower() == "true"
605
606    @property
607    def is_bin(self):
608        # _agent_mode init in device test driver
609        # 0 is hap, 1 is abc, 2 is bin
610        return False if self._agent_mode == AgentMode.hap else True
611
612    def set_agent_mode(self, mode: AgentMode = AgentMode.bin):
613        if not mode:
614            mode = AgentMode.bin
615        if mode == AgentMode.hap and not self.is_root:
616            LOG.debug("Current device is not root, can not set hap mode, change to bin mode.")
617            self._agent_mode = AgentMode.bin
618        else:
619            self._agent_mode = mode
620
621        if self._agent_mode == AgentMode.hap:
622            LOG.debug("Current mode is normal mode.")
623        else:
624            self._agent_mode = AgentMode.bin
625            LOG.debug("Current mode is binary mode.")
626
627    def check_if_bin(self):
628        ret = False
629        self._agent_mode = AgentMode.abc
630        base_version = tuple("4.1.3.9".split("."))
631        uitest_version = self.execute_shell_command("/system/bin/uitest --version")
632        self.log.debug("uitest version is {}".format(uitest_version))
633        if check_uitest_version(uitest_version, base_version):
634            self._agent_mode = AgentMode.bin
635            ret = True
636        self.log.debug("{}".format("Binary agent run in {} mode".format(self._agent_mode)))
637        return ret
638
639    def _check_developer_mode_status(self):
640        if not self.is_root:
641            return True
642        status = self.execute_shell_command("param get const.security.developermode.state")
643        self.log.debug(status)
644        if status and status.strip() == "true":
645            return True
646        else:
647            return False
648
649    @property
650    def proxy(self):
651        """The first rpc session initiated on this device. None if there isn't
652        one.
653        """
654        try:
655            if self._proxy is None:
656                self.log.debug("{}".format("Hap agent run in {} mode".format(self._agent_mode)))
657                # check uitest
658                self.check_uitest_status()
659                self._proxy = self.get_harmony()
660        except HDCFPortError as error:
661            raise error
662        except AppInstallError as error:
663            raise error
664        except OHOSRpcNotRunningError as error:
665            raise error
666        except Exception as error:
667            self._proxy = None
668            self.log.error("DeviceTest-10012 proxy:%s" % str(error))
669        return self._proxy
670
671    @property
672    def abc_proxy(self):
673        """The first rpc session initiated on this device. None if there isn't
674        one.
675        """
676        try:
677            if self._abc_proxy is None:
678                # check uitest
679                self.check_uitest_status()
680                self._abc_proxy = self.get_harmony(start_abc=True)
681        except HDCFPortError as error:
682            raise error
683        except OHOSRpcNotRunningError as error:
684            raise error
685        except Exception as error:
686            self._abc_proxy = None
687            self.log.error("DeviceTest-10012 abc_proxy:%s" % str(error))
688        return self._abc_proxy
689
690    @property
691    def uitestdeamon(self):
692        from devicetest.controllers.uitestdeamon import \
693            UiTestDeamon
694        if self._uitestdeamon is None:
695            self._uitestdeamon = UiTestDeamon(self)
696        return self._uitestdeamon
697
698    @classmethod
699    def set_module_package(cls, module_packag):
700        cls.oh_module_package = module_packag
701
702    @classmethod
703    def set_moudle_ablity_name(cls, module_ablity_name):
704        cls.module_ablity_name = module_ablity_name
705
706    @property
707    def is_oh(self):
708        return True
709
710    def get_harmony(self, start_abc=False):
711        if self.initdevice:
712            if start_abc:
713                self.start_abc_rpc(re_install_rpc=True)
714            else:
715                self.start_harmony_rpc(re_install_rpc=True)
716        # clear old port,because abc and fast mode will not remove port
717        self.fport_tcp_port(start_abc=start_abc)
718        rpc_proxy = None
719        try:
720            from devicetest.controllers.openharmony import OpenHarmony
721            rpc_proxy = OpenHarmony(port=self._h_port, addr=self.host, timeout=self.rpc_timeout, device=self)
722        except Exception as error:
723            self.log.error(' proxy init error: {}.'.format(str(error)))
724        return rpc_proxy
725
726    def start_uitest(self):
727        result = ""
728        if self.is_bin:
729            result = self.execute_shell_command("{} start-daemon singleness".format(UITEST_PATH))
730        else:
731            share_mem_mode = False
732            base_version = [3, 2, 2, 2]
733            uitest_version = self.execute_shell_command("{} --version".format(UITEST_PATH))
734            if uitest_version and re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}', uitest_version):
735                uitest_version = uitest_version.split(".")
736                for index, _ in enumerate(uitest_version):
737                    if int(uitest_version[index]) > base_version[index]:
738                        share_mem_mode = True
739                        break
740            else:
741                share_mem_mode = True
742            if share_mem_mode:
743                if not self.is_file_exist(UITEST_SHMF):
744                    self.log.debug('Path {} not exist, create it.'.format(UITEST_SHMF))
745                    self.execute_shell_command("echo abc > {}".format(UITEST_SHMF))
746                    self.execute_shell_command("chmod -R 666 {}".format(UITEST_SHMF))
747                result = self.execute_shell_command("{} start-daemon {}".format(UITEST_PATH, UITEST_SHMF))
748            else:
749                result = self.execute_shell_command(UITEST_COMMAND)
750        self.log.debug('start uitest, {}'.format(result))
751
752    def start_harmony_rpc(self, re_install_rpc=False, reconnect=False):
753        if not self._check_developer_mode_status():
754            raise OHOSDeveloperModeNotTrueError(ErrorMessage.Device.Code_0303015, device=self)
755
756        if not reconnect and self.check_rpc_status(check_abc=False, check_times=1) == SUCCESS_CODE:
757            if (hasattr(sys, ConfigConst.env_pool_cache) and
758                getattr(sys, ConfigConst.env_pool_cache, False)) \
759                    or not re_install_rpc:
760                self.log.debug('Harmony rpc already start!!!!')
761                return
762        if re_install_rpc:
763            try:
764                from devicetest.controllers.openharmony import OpenHarmony
765                OpenHarmony.install_harmony_rpc(self)
766            except ImportError as error:  # pylint:disable=undefined-variable
767                self.log.debug(str(error))
768                self.log.error('please check devicetest extension module is exist.')
769                raise Exception(ErrorMessage.Config.Code_0302006)
770            except AppInstallError as error:
771                raise error
772            except Exception as error:
773                self.log.debug(str(error))
774                self.log.error('root device init RPC error.')
775                raise Exception(ErrorMessage.Config.Code_0302006)
776        if not self.is_bin:
777            self.stop_harmony_rpc(reconnect=reconnect)
778        else:
779            self.log.debug('Binary mode, kill hap if hap is running.')
780            self.stop_harmony_rpc(kill_uitest=False, reconnect=reconnect)
781        cmd = "aa start -a {}.ServiceAbility -b {}".format(DEVICETEST_HAP_PACKAGE_NAME, DEVICETEST_HAP_PACKAGE_NAME)
782        result = self.execute_shell_command(cmd)
783        self.log.debug('start devicetest ability, {}'.format(result))
784        if "successfully" not in result:
785            raise OHOSRpcStartFailedError(ErrorMessage.Device.Code_0303016.format(
786                "system" if self.is_bin else "normal", result), device=self)
787        if not self.is_bin:
788            self.start_uitest()
789        time.sleep(1)
790        check_result = self.check_rpc_status(check_abc=False)
791        self.raise_exception(check_result)
792
793    def raise_exception(self, error_code: str):
794        if error_code == SUCCESS_CODE:
795            return
796        rpc_mode = "system" if self.is_bin else "normal"
797        if error_code == ErrorMessage.Device.Code_0303025.code:
798            raise OHOSRpcProcessNotFindError(ErrorMessage.Device.Code_0303025, device=self)
799        elif error_code == ErrorMessage.Device.Code_0303026.code:
800            raise OHOSRpcPortNotFindError(ErrorMessage.Device.Code_0303026, device=self)
801        elif error_code == ErrorMessage.Device.Code_0303027.code:
802            raise OHOSRpcProcessNotFindError(ErrorMessage.Device.Code_0303023.format(rpc_mode), device=self)
803        elif error_code == ErrorMessage.Device.Code_0303028.code:
804            raise OHOSRpcPortNotFindError(ErrorMessage.Device.Code_0303024.format(rpc_mode), device=self)
805
806    def start_abc_rpc(self, re_install_rpc=False, reconnect=False):
807        if re_install_rpc:
808            try:
809                from devicetest.controllers.openharmony import OpenHarmony
810                OpenHarmony.init_agent_resource(self)
811            except ImportError as error:  # pylint:disable=undefined-variable
812                self.log.debug(str(error))
813                self.log.error('please check devicetest extension module is exist.')
814                raise error
815            except Exception as error:
816                self.log.debug(str(error))
817                self.log.error('root device init abc RPC error.')
818                raise error
819        if reconnect:
820            self.stop_harmony_rpc(kill_hap=False, reconnect=reconnect)
821        if self.is_bin and self.check_rpc_status(check_abc=True, check_times=1) == SUCCESS_CODE:
822            self.log.debug('Harmony abc rpc already start!!!!')
823            return
824        self.start_uitest()
825        time.sleep(1)
826        check_result = self.check_rpc_status(check_abc=True)
827        self.raise_exception(check_result)
828
829    def stop_harmony_rpc(self, kill_uitest=True, kill_hap=True, reconnect=False):
830        if not self.get_recover_state():
831            LOG.warning("device state is false, skip stop harmony rpc.")
832            return
833        proc_pids = self.get_devicetest_proc_pid()
834        for index, pid in enumerate(proc_pids):
835            if not kill_uitest and kill_hap and index == 1:
836                continue
837            if not kill_hap and kill_uitest and index == 2:
838                continue
839            if pid != "":
840                if reconnect:
841                    name = "uitest" if index != 2 else "devicetest"
842                    self._dump_pid_info(pid, name)
843                cmd = 'kill -9 {}'.format(pid)
844                ret = self.execute_shell_command(cmd)
845                if index == 2 and "Operation not permitted" in ret:
846                    stop_hap = 'aa force-stop {}'.format(DEVICETEST_HAP_PACKAGE_NAME)
847                    self.execute_shell_command(stop_hap)
848        self.wait_listen_port_disappear()
849
850    def wait_listen_port_disappear(self):
851        end_time = time.time() + 5
852        times = 0
853        while time.time() < end_time:
854            if times == 0:
855                is_print = True
856            else:
857                is_print = False
858            if not self.is_harmony_rpc_socket_running(self.d_port, is_print=is_print):
859                break
860            times += 1
861        if times > 0:
862            self.is_harmony_rpc_socket_running(self.d_port, is_print=True)
863
864    def _dump_pid_info(self, pid, name):
865        try:
866            path = os.path.join(self._device_report_path, "log", "pid_info")
867            if not os.path.exists(path):
868                os.makedirs(path)
869            file_path = os.path.join(path, "{}_pid_info_{}.txt".format(name, pid))
870            pid_info_file = os.open(file_path, os.O_WRONLY | os.O_CREAT | os.O_APPEND, FilePermission.mode_755)
871            ret = self.execute_shell_command("dumpcatcher -p {}".format(pid))
872            with os.fdopen(pid_info_file, "a") as pid_info_file_pipe:
873                pid_info_file_pipe.write(ret)
874        except Exception as e:
875            LOG.error("Dump {} pid info fail. Error: {}".format(pid, e))
876
877    # check uitest if running well, otherwise kill it first
878    def check_uitest_status(self):
879        if not self.is_root:
880            ret = self.execute_shell_command("uitest --version")
881            if "inaccessible or not found" in ret:
882                raise OHOSDeveloperModeNotTrueError(ErrorMessage.Device.Code_0303021, device=self)
883        self.log.debug('Check uitest running status.')
884        proc_pids = self.get_devicetest_proc_pid()
885        if proc_pids[2] != "" and not self._proxy:
886            self.execute_shell_command('kill -9 {}'.format(proc_pids[2]))
887        if self.is_bin and proc_pids[0] != "":
888            self.execute_shell_command('kill -9 {}'.format(proc_pids[0]))
889            self.log.debug('Uitest is running in normal mode, current mode is bin/abc, wait it exit.')
890        if not self.is_bin and proc_pids[1] != "":
891            self.execute_shell_command('kill -9 {}'.format(proc_pids[1]))
892            self.log.debug('Uitest is running in abc mode, current mode is normal, wait it exit.')
893        self.log.debug('Finish check uitest running status.')
894
895    def get_devicetest_proc_pid(self):
896        # # 0-uitest 1-uitest-sigleness 2-hap
897        proc_pids = [""] * 3
898        if not self.is_bin:
899            proc_pids[0] = self.execute_shell_command("pidof {}".format(UITEST_NAME)).strip()
900        else:
901            cmd = 'ps -ef | grep {}'.format(UITEST_SINGLENESS)
902            proc_running = self.execute_shell_command(cmd).strip()
903            proc_running = proc_running.split("\n")
904            for data in proc_running:
905                if UITEST_SINGLENESS in data and "grep" not in data and EXTENSION_NAME not in data:
906                    data = data.split()
907                    proc_pids[1] = data[1]
908        proc_pids[2] = self.execute_shell_command("pidof {}".format(DEVICETEST_HAP_PACKAGE_NAME)).strip()
909
910        return proc_pids
911
912    def is_harmony_rpc_running(self, check_abc=False):
913        proc_pids = self.get_devicetest_proc_pid()
914        if not self.is_bin:
915            self.log.debug('is_proc_running: agent pid: {}, uitest pid: {}'.format(proc_pids[2], proc_pids[0]))
916            if proc_pids[2] != "" and proc_pids[0] != "":
917                return True
918        else:
919            if check_abc:
920                self.log.debug('is_proc_running: uitest pid: {}'.format(proc_pids[1]))
921                if proc_pids[1] != "":
922                    return True
923            else:
924                self.log.debug('is_proc_running: agent pid: {}'.format(proc_pids[2]))
925                if proc_pids[2] != "":
926                    return True
927        return False
928
929    def is_harmony_rpc_socket_running(self, port: int, check_server: bool = True, is_print: bool = True) -> bool:
930        if not self.is_root:
931            return True
932        out = self.execute_shell_command("netstat -atn | grep :{}".format(port))
933        if is_print:
934            self.log.debug(out)
935        if out:
936            out = out.split("\n")
937            for data in out:
938                if check_server:
939                    if "LISTEN" in data and str(port) in data:
940                        return True
941                else:
942                    if "hdcd" in data and str(port) in data:
943                        return True
944        return False
945
946    def check_rpc_status(self, check_abc: bool = False, check_server: bool = True, check_times: int = 3) -> str:
947        port = self.d_port if not check_abc else self.abc_d_port
948        for i in range(check_times):
949            if self.is_harmony_rpc_running(check_abc):
950                break
951            else:
952                self.log.debug("check harmony rpc failed {} times, If is check bin(abc): {}, "
953                               "try to check again in 1 seconds".format(i + 1, check_abc))
954                time.sleep(1)
955        else:
956            self.log.debug(f"{check_times} times check failed.")
957            self.log.debug('Harmony rpc is not running!!!! If is check bin(abc): {}'.format(check_abc))
958            if check_abc:
959                return ErrorMessage.Device.Code_0303025.code
960            else:
961                return ErrorMessage.Device.Code_0303027.code
962
963        for i in range(check_times):
964            if self.is_harmony_rpc_socket_running(port, check_server=check_server):
965                break
966            else:
967                self.log.debug("Harmony rpc port is not find {} times, If is check bin(abc): {}, "
968                               "try to find again in 1 seconds".format(i + 1, check_abc))
969                time.sleep(1)
970        else:
971            self.log.debug('Harmony rpc port is not find!!!! If is check bin(abc): {}'.format(check_abc))
972            if check_abc:
973                return ErrorMessage.Device.Code_0303026.code
974            else:
975                return ErrorMessage.Device.Code_0303028.code
976        self.log.debug('Harmony rpc is running!!!! If is check abc: {}'.format(check_abc))
977        return SUCCESS_CODE
978
979    def call_proxy_listener(self):
980        if ((self.is_bin and self._abc_proxy) or
981                (not self.is_bin and self._proxy)):
982            if self.proxy_listener is not None:
983                self.proxy_listener(is_exception=True)
984        if self._proxy:
985            if self.win_proxy_listener is not None:
986                self.win_proxy_listener(is_exception=True)
987
988    def install_app(self, remote_path, command):
989        try:
990            ret = self.execute_shell_command(
991                "pm install %s %s" % (command, remote_path))
992            if ret is not None and str(
993                    ret) != "" and "Unknown option: -g" in str(ret):
994                return self.execute_shell_command(
995                    "pm install -r %s" % remote_path)
996            return ret
997        except Exception as error:
998            self.log.error("%s, maybe there has a warning box appears "
999                           "when installing RPC." % error)
1000            return False
1001
1002    def uninstall_app(self, package_name):
1003        try:
1004            ret = self.execute_shell_command("pm uninstall %s" % package_name)
1005            self.log.debug(ret)
1006            return ret
1007        except Exception as err:
1008            self.log.error('DeviceTest-20013 uninstall: %s' % str(err))
1009            return False
1010
1011    def check_need_install_bin(self):
1012        # check if agent.so exist
1013        if self._agent_mode == AgentMode.bin:
1014            ret = self.execute_shell_command("ls -l /data/local/tmp/agent.so")
1015        else:
1016            ret = self.execute_shell_command("ls -l /data/local/tmp/app.abc")
1017        LOG.debug(ret)
1018        if ret is None or "No such file or directory" in ret:
1019            return True
1020        return False
1021
1022    def reconnect(self, waittime=60, proxy=None):
1023        """
1024        @summary: Reconnect the device.
1025        """
1026        self.call_proxy_listener()
1027
1028        if not self.wait_for_boot_completion():
1029            if self._proxy:
1030                self._proxy.close()
1031            self._proxy = None
1032            if self._abc_proxy:
1033                self._abc_proxy.close()
1034            self._abc_proxy = None
1035            self._uitestdeamon = None
1036            self.remove_ports()
1037            raise Exception("Reconnect timed out.")
1038
1039        if not self.is_root and self._agent_mode == AgentMode.hap:
1040            LOG.debug("Reconnect device is not root, change hap mode to bin mode.")
1041            self._agent_mode = AgentMode.bin
1042
1043        if self._proxy and (proxy is None or proxy == AgentMode.hap):
1044            self.start_harmony_rpc(re_install_rpc=True, reconnect=True)
1045            self.fport_tcp_port(start_abc=False)
1046            try:
1047                self._proxy.init(port=self._h_port, addr=self.host, device=self)
1048            except Exception as _:
1049                time.sleep(3)
1050                self._proxy.init(port=self._h_port, addr=self.host, device=self)
1051
1052        if self.is_bin and self._abc_proxy and (proxy is None or proxy == AgentMode.bin):
1053            re_install = self.check_need_install_bin()
1054            self.start_abc_rpc(re_install_rpc=re_install, reconnect=True)
1055            self.fport_tcp_port(start_abc=True)
1056            try:
1057                self._abc_proxy.init(port=self._h_port, addr=self.host, device=self)
1058            except Exception as _:
1059                time.sleep(3)
1060                self._abc_proxy.init(port=self._h_port, addr=self.host, device=self)
1061
1062        if self._uitestdeamon is not None:
1063            self._uitestdeamon.init(self)
1064
1065        if self._proxy:
1066            return self._proxy
1067        return None
1068
1069    def fport_tcp_port(self, start_abc: bool = False) -> bool:
1070        filter_ports = []
1071        for i in range(3):
1072            host_port = self.get_local_port(start_abc=start_abc, filter_ports=filter_ports)
1073            remote_port = self.abc_d_port if start_abc else self.d_port
1074            cmd = "fport tcp:{} tcp:{}".format(host_port, remote_port)
1075            result = self.connector_command(cmd)
1076            if "Fail" not in result:
1077                self._h_port = host_port
1078                LOG.debug(f"hdc fport success, get_proxy host_port: {host_port}, remote_port: {remote_port}")
1079                return True
1080            filter_ports.append(host_port)
1081            LOG.debug(f"The {i + 1} time HDC fport tcp port fail.")
1082            from devicetest.utils.util import check_port_state
1083            check_port_state(host_port)
1084        else:
1085            err_msg = ErrorMessage.Device.Code_0303022
1086            LOG.error(err_msg)
1087            raise HDCFPortError(err_msg)
1088
1089    def get_local_port(self, start_abc: bool, filter_ports: list = None):
1090        if filter_ports is None:
1091            filter_ports = []
1092        from devicetest.utils.util import get_forward_port
1093        host = self.host
1094        port = None
1095        h_port = get_forward_port(self, host, port, filter_ports)
1096        if start_abc:
1097            self.remove_ports(normal=False)
1098            self.forward_ports_abc.append(h_port)
1099        else:
1100            self.remove_ports(abc=False)
1101            self.forward_ports.append(h_port)
1102        self.log.info("tcp forward port: {} for {}".format(
1103            h_port, convert_serial(self.device_sn)))
1104        return h_port
1105
1106    def remove_ports(self, abc: bool = True, normal: bool = True):
1107        if abc:
1108            for port in self.forward_ports_abc:
1109                cmd = "fport rm tcp:{} tcp:{}".format(
1110                    port, self.abc_d_port)
1111                self.connector_command(cmd)
1112            self.forward_ports_abc.clear()
1113        if normal:
1114            for port in self.forward_ports:
1115                cmd = "fport rm tcp:{} tcp:{}".format(
1116                    port, self.d_port)
1117                self.connector_command(cmd)
1118            self.forward_ports.clear()
1119
1120    def remove_history_ports(self, port):
1121        cmd = "fport ls"
1122        res = self.connector_command(cmd, is_print=False)
1123        res = res.split("\n")
1124        for data in res:
1125            if str(port) in data:
1126                data = data.split('\t')
1127                cmd = "fport rm {}".format(data[0][1:-1])
1128                self.connector_command(cmd, is_print=False)
1129
1130    def take_picture(self, name):
1131        """
1132        @summary: 截取手机屏幕图片并保存
1133        @param  name: 保存的图片名称,通过getTakePicturePath方法获取保存全路径
1134        """
1135        path = ""
1136        try:
1137            if self._device_report_path is None:
1138                from xdevice import EnvPool
1139                self._device_report_path = EnvPool.report_path
1140            temp_path = os.path.join(self._device_report_path, "temp")
1141            if not os.path.exists(temp_path):
1142                os.makedirs(temp_path)
1143            path = os.path.join(temp_path, name)
1144            picture_name = os.path.basename(name)
1145            out = self.execute_shell_command("snapshot_display -f /data/local/tmp/{}".format(picture_name))
1146            self.log.debug("result: {}".format(out))
1147            if "error" in out and "success" not in out:
1148                return False
1149            else:
1150                self.pull_file("/data/local/tmp/{}".format(picture_name), path)
1151        except Exception as error:
1152            self.log.error("devicetest take_picture: {}".format(str(error)))
1153        return path
1154
1155    def capture(self, link: str, path: str, ext: str = ".png") -> Tuple[str, str]:
1156        """
1157        截图步骤实现,未使用参数是保持一致
1158        :param link: 链接
1159        :param path: 保存路径
1160        :param ext: 后缀
1161        :return: link path 链接
1162        """
1163        remote = "/data/local/tmp/xdevice_screenshot{}".format(ext)
1164        new_ext = ".jpeg"
1165        link = link[:link.rfind(ext)] + new_ext
1166        path = path[:path.rfind(ext)] + new_ext
1167        remote = remote[:remote.rfind(ext)] + new_ext
1168        result = self.execute_shell_command("snapshot_display -f {}".format(remote), timeout=60000)
1169        LOG.debug("{}".format(result))
1170        # 适配非root
1171        if not self.is_root:
1172            time.sleep(1)
1173        self.pull_file(remote, path)
1174        self.execute_shell_command("rm -f {}".format(remote))
1175        return link, path
1176
1177    def set_device_report_path(self, path):
1178        self._device_report_path = path
1179
1180    def get_device_report_path(self):
1181        return self._device_report_path
1182
1183    def get_device_params(self, refresh=True):
1184        """
1185        获取设备属性信息
1186        @return:
1187        """
1188        if refresh:
1189            for key, value in self.device_params_command.items():
1190                if value and isinstance(value, str):
1191                    self.device_params[key] = self.get_property_value(value)
1192            self.device_params[DeviceProperties.sn] = self.device_sn
1193            try:
1194                result = self.execute_shell_command(
1195                    "snapshot_display -f /data/local/tmp/screen.png")
1196                if "success" not in result or "successfully" not in result:
1197                    result = self.execute_shell_command(
1198                        "snapshot_display -f /data/local/tmp/screen.jpeg")
1199                pattern = re.search(r"width \d+. height \d+", result)
1200                resolution = re.findall(r"\d+", pattern.group())
1201                self.device_params[DeviceProperties.xres] = resolution[0]
1202                self.device_params[DeviceProperties.yres] = resolution[1]
1203            except Exception as error:
1204                resolution = self.uitestdeamon.get_display_density()
1205                if resolution:
1206                    resolution = json.loads(resolution)
1207                    self.device_params[DeviceProperties.xres] = resolution.get("X",
1208                                                                               "")
1209                    self.device_params[DeviceProperties.yres] = resolution.get("Y",
1210                                                                               "")
1211        return copy.deepcopy(self.device_params)
1212
1213    def execute_shell_in_daemon(self, command):
1214        if self.host != "127.0.0.1":
1215            cmd = [HdcHelper.CONNECTOR_NAME, "-s", "{}:{}".format(
1216                self.host, self.port), "-t", self.device_sn, "shell"]
1217        else:
1218            cmd = [HdcHelper.CONNECTOR_NAME, "-t", self.device_sn, "shell"]
1219        LOG.debug("{} execute command {} {} in daemon".format(
1220            convert_serial(self.device_sn), HdcHelper.CONNECTOR_NAME, command))
1221        if isinstance(command, list):
1222            cmd.extend(command)
1223        else:
1224            command = command.strip()
1225            cmd.extend(command.split(" "))
1226        sys_type = platform.system()
1227        process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
1228                                   shell=False,
1229                                   preexec_fn=None if sys_type == "Windows"
1230                                   else os.setsid,
1231                                   close_fds=True)
1232        return process
1233
1234    def check_advance_option(self, extend_value, **kwargs):
1235        if not isinstance(extend_value, dict):
1236            return True
1237
1238        advance_dict = extend_value.get(AdvanceDeviceOption.advance, None)
1239        if not isinstance(advance_dict, dict):
1240            return True
1241        # 匹配设备别名
1242        adv_alias = advance_dict.get(DeviceProperties.alias, "")
1243        adv_label = advance_dict.get(AdvanceDeviceOption.label, "")
1244        alias = (adv_alias or adv_label).strip().upper()
1245        if alias:
1246            is_matched = False
1247            selection = "selection:{alias:%s}" % alias
1248            # 兼容-di参数
1249            device_info = kwargs.get("device_info", None)
1250            if device_info and isinstance(device_info, list):
1251                di_alias = ""
1252                for info in device_info:
1253                    if not isinstance(info, dict) or info.get("sn", "") != self.device_sn:
1254                        continue
1255                    di_alias = info.get("type", "")
1256                    is_matched = di_alias == alias
1257                    break
1258                if not is_matched:
1259                    LOG.error("device:{sn:%s, alias:%s} mismatch %s, please check "
1260                              "the [-di] running params!" % (self.device_sn, di_alias, selection))
1261                    LOG.info("current [-di] running params is: %s" % device_info)
1262                    return False
1263                self.device_id = di_alias
1264            elif self.device_id == alias:
1265                is_matched = True
1266            if not is_matched:
1267                LOG.error("device:{sn:%s, alias:%s} mismatch %s" % (
1268                    self.device_sn, self.device_id, selection))
1269                return False
1270
1271        # 匹配设备额外的信息
1272        advance_type = advance_dict.get(AdvanceDeviceOption.type, None)
1273        advance_product = advance_dict.get(AdvanceDeviceOption.product, None)
1274        advance_version = advance_dict.get(AdvanceDeviceOption.version, None)
1275        advance_product_cmd = advance_dict.get(AdvanceDeviceOption.product_cmd, None)
1276        advance_version_cmd = advance_dict.get(AdvanceDeviceOption.version_cmd, None)
1277        if advance_type and advance_type == AdvanceDeviceOption.command \
1278                and advance_product_cmd \
1279                and advance_version_cmd:
1280            if advance_product is not None:
1281                self.device_params[DeviceProperties.model] = \
1282                    self.execute_shell_command(advance_product_cmd).strip()
1283            if advance_version is not None:
1284                self.device_params[DeviceProperties.system_version] = \
1285                    self.execute_shell_command(advance_version_cmd).strip()
1286        else:
1287            if advance_product is not None:
1288                self.device_params[DeviceProperties.model] = \
1289                    self.get_property(self.device_params_command.get(DeviceProperties.model, ""))
1290            if advance_version is not None:
1291                self.device_params[DeviceProperties.system_version] = \
1292                    self.get_property(self.device_params_command.get(DeviceProperties.system_version, ""))
1293
1294        if advance_product and advance_version:
1295            return True if advance_product == self.device_params.get(DeviceProperties.model, "") \
1296                           and advance_version == self.device_params.get(DeviceProperties.system_version, "") else False
1297        elif advance_product and advance_version is None:
1298            return True if advance_product == self.device_params.get(DeviceProperties.model, "") else False
1299        elif advance_product is None and advance_version:
1300            return True if advance_version == self.device_params.get(DeviceProperties.system_version, "") else False
1301        else:
1302            return True
1303
1304    @property
1305    def webview(self):
1306        from devicetest.controllers.web.webview import WebView
1307        if self._webview is None:
1308            self._webview = WebView(self)
1309        return self._webview
1310
1311
1312class DeviceLogCollector:
1313    hilog_file_address = []
1314    log_file_address = []
1315    hdc_module_name = ""
1316    device = None
1317    restart_proc = []
1318    device_log_level = None
1319    is_clear = True
1320    device_hilog_proc = None
1321    need_pull_hdc_log = False  # 是否需要拉取hdc日志
1322
1323    # log
1324    hilog_file_pipes = []
1325    device_log = dict()
1326    hilog = dict()
1327    log_proc = dict()
1328    hilog_proc = dict()
1329
1330    _cur_thread_ident = None
1331    _cur_thread_name = None
1332    _hilog_begin_time = None
1333    _latest_pull_abnormal_log_time = time.time()
1334
1335    def __init__(self, device):
1336        self.device = device
1337
1338    def restart_catch_device_log(self):
1339        self._sync_device_time()
1340        for _, path in enumerate(self.hilog_file_address):
1341            hilog_open = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
1342                                 FilePermission.mode_755)
1343            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
1344                _, proc = self.start_catch_device_log(hilog_file_pipe=hilog_file_pipe)
1345                self.restart_proc.append(proc)
1346
1347    def stop_restart_catch_device_log(self):
1348        # when device free stop restart log proc
1349        for _, proc in enumerate(self.restart_proc):
1350            self.stop_catch_device_log(proc)
1351        self.restart_proc.clear()
1352        self.hilog_file_address.clear()
1353        self.log_file_address.clear()
1354
1355    def _set_device_log_level(self, **kwargs):
1356        # 设备日志级别
1357        if not self.device_log_level:
1358            log_level = kwargs.get("log_level", "INFO")
1359            if log_level not in LOGLEVEL:
1360                self.device_log_level = "INFO"
1361            else:
1362                self.device_log_level = log_level
1363        cmd = "hilog -b {}".format(self.device_log_level)
1364        self.device.execute_shell_command(cmd)
1365
1366    def _set_hilog_begin_time(self):
1367        """设置日志抓取任务的开始时间"""
1368        cur_thread = threading.current_thread()
1369        cur_thread_id, cur_thread_name = cur_thread.ident, cur_thread.name
1370        if self._cur_thread_ident != cur_thread_id or self._cur_thread_name != cur_thread_name:
1371            # 用例连续运行,执行线程会变换,这时更新线程id和开始时间
1372            self._cur_thread_ident, self._cur_thread_name = cur_thread_id, cur_thread_name
1373            self._hilog_begin_time = time.time()
1374
1375    def start_catch_device_log(self, log_file_pipe=None, hilog_file_pipe=None, **kwargs):
1376        """
1377        Starts hdc log for each device in separate subprocesses and save
1378        the logs in files.
1379        """
1380        self._sync_device_time()
1381        self._set_device_log_level(**kwargs)
1382        self._set_hilog_begin_time()
1383
1384        device_hilog_proc = None
1385        if hilog_file_pipe:
1386            command = "hilog"
1387            if self.device.host != "127.0.0.1":
1388                cmd = [HdcHelper.CONNECTOR_NAME, "-s", "{}:{}".format(self.device.host, self.device.port),
1389                       "-t", self.device.device_sn, "shell", command]
1390            else:
1391                cmd = [HdcHelper.CONNECTOR_NAME, "-t", self.device.device_sn, "shell", command]
1392            LOG.info("execute command: %s" % " ".join(cmd).replace(
1393                self.device.device_sn, convert_serial(self.device.device_sn)))
1394            device_hilog_proc = start_standing_subprocess(
1395                cmd, hilog_file_pipe)
1396        self.device_hilog_proc = device_hilog_proc
1397        return None, device_hilog_proc
1398
1399    def stop_catch_device_log(self, proc):
1400        """
1401        Stops all hdc log subprocesses.
1402        """
1403        if proc:
1404            self.device.log.debug("Stop catch device hilog.")
1405            stop_standing_subprocess(proc)
1406        if self.hdc_module_name:
1407            self.pull_hdc_log(self.hdc_module_name)
1408            self.hdc_module_name = None
1409
1410    def start_hilog_task(self, **kwargs):
1411        """启动日志抓取任务。若设备没有在抓取日志,则设置启动抓取(不删除历史日志,以免影响其他组件运行)"""
1412        log_size = kwargs.get("log_size", "10M").upper()
1413        if re.search("^[0-9]+[K?]$", log_size) is None \
1414                and re.search("^[0-9]+[M?]$", log_size) is None:
1415            self.device.log.debug("hilog task Invalid size string {}. Use default 10M".format(log_size))
1416            log_size = "10M"
1417        matcher = re.match("^[0-9]+", log_size)
1418        if log_size.endswith("K") and int(matcher.group(0)) < 64:
1419            self.device.log.debug("hilog task file size should be "
1420                                  "in range [64.0K, 512.0M], use min value 64K, now is {}".format(log_size))
1421            log_size = "64K"
1422        if log_size.endswith("M") and int(matcher.group(0)) > 512:
1423            self.device.log.debug("hilog task file size should be "
1424                                  "in range [64.0K, 512.0M], use min value 512M, now is {}".format(log_size))
1425            log_size = "512M"
1426
1427        self._sync_device_time()
1428        self._set_device_log_level(**kwargs)
1429        self._set_hilog_begin_time()
1430
1431        # 启动日志任务
1432        out = self.device.execute_shell_command('hilog -w query')
1433        LOG.debug(out)
1434        if 'No running persistent task' in out:
1435            # 启动hilog日志任务
1436            self.device.execute_shell_command('hilog -w start -l {} -n 1000'.format(log_size))
1437        if 'kmsg' not in out:
1438            # 启动kmsg日志任务
1439            self.device.execute_shell_command('hilog -w start -t kmsg -l {} -n 1000'.format(log_size))
1440
1441    def stop_hilog_task(self, log_name, repeat=1, repeat_round=1, **kwargs):
1442        module_name = kwargs.get("module_name", "")
1443        round_folder = f"round{repeat_round}" if repeat > 1 else ""
1444        base_dir = os.path.join(self.device.get_device_report_path(), "log", round_folder)
1445        if module_name:
1446            path = os.path.join(base_dir, module_name)
1447        else:
1448            path = base_dir
1449        os.makedirs(path, exist_ok=True)
1450
1451        # 获取hilog日志
1452        hilog_local = os.path.join(path, "hilog_{}".format(log_name))
1453        self.get_period_log({HILOG_PATH: ""}, hilog_local)
1454        # 拉取最新的字典文件。若hilog都没拉出来,字典文件也不用拉取了
1455        if os.path.exists(hilog_local):
1456            out = self.device.execute_shell_command('ls -t {} | grep hilog_dict'.format(HILOG_PATH))
1457            LOG.debug(out)
1458            log_dicts = out.strip().replace('\r', '').split('\n') if out else []
1459            if log_dicts:
1460                self.device.pull_file(HILOG_PATH + '/' + log_dicts[0], hilog_local, retry=0)
1461            else:
1462                LOG.warning("hilog_dict does not exist, and it won't be pulled")
1463
1464        # 获取crash日志
1465        self.start_get_crash_log(log_name, repeat=repeat, repeat_round=repeat_round, module_name=module_name)
1466        # 获取额外路径的日志
1467        extras_dirs = kwargs.get("extras_dirs", "")
1468        self.pull_extra_log_files(log_name, module_name, extras_dirs, round_folder=round_folder)
1469        # 获取hdc日志
1470        self.pull_hdc_log(module_name, round_folder=round_folder)
1471
1472    def pull_hdc_log(self, module_name, round_folder=""):
1473        if not self.need_pull_hdc_log:
1474            return
1475        report_path = self.device.get_device_report_path()
1476        if not report_path:
1477            return
1478        hdc_log_save_path = os.path.join(
1479            report_path, "log", round_folder, module_name, "hdc_log")
1480        if not os.path.exists(hdc_log_save_path):
1481            os.makedirs(hdc_log_save_path)
1482        temp_dir = tempfile.gettempdir()
1483        files = os.listdir(temp_dir)
1484        for file in files:
1485            if "hdc.log" in file or "hdclast.log" in file:
1486                hdc_log = os.path.join(temp_dir, file)
1487                shutil.copy(hdc_log, hdc_log_save_path)
1488
1489    def start_get_crash_log(self, task_name, repeat=1, repeat_round=1, **kwargs):
1490        self._set_hilog_begin_time()
1491        module_name = kwargs.get("module_name", "")
1492        round_folder = f"round{repeat_round}" if repeat > 1 else ""
1493        base_dir = os.path.join(self.device.get_device_report_path(), "log", round_folder)
1494        crash_folder = f"crash_log_{task_name}"
1495        if module_name:
1496            crash_path = os.path.join(base_dir, module_name, crash_folder)
1497        else:
1498            crash_path = os.path.join(base_dir, crash_folder)
1499
1500        crash_logs = {
1501            NATIVE_CRASH_PATH: ["cppcrash"],
1502            # JS_CRASH_PATH设为空,表示拉取这个路径下用例运行期间生成的文件
1503            JS_CRASH_PATH: [],
1504            ROOT_PATH: ["SERVICE_BLOCK", "appfreeze"]
1505        }
1506        remotes = {}
1507        for base_path, folders in crash_logs.items():
1508            for folder in folders:
1509                remote_dir = base_path + '/' + folder if folder else base_path
1510                remotes.update({remote_dir: ""})
1511            else:
1512                remotes.update({base_path: ""})
1513        self.get_period_log(remotes, crash_path)
1514
1515    def clear_crash_log(self):
1516        warnings.warn('this function is no longer supported', DeprecationWarning)
1517
1518    def _sync_device_time(self):
1519        # 先同步PC和设备的时间
1520        iso_time_format = '%Y-%m-%d %H:%M:%S'
1521        cur_time = get_cst_time().strftime(iso_time_format)
1522        self.device.execute_shell_command("date '{}'".format(cur_time))
1523
1524    def add_log_address(self, log_file_address, hilog_file_address):
1525        # record to restart catch log when reboot device
1526        if log_file_address:
1527            self.log_file_address.append(log_file_address)
1528        if hilog_file_address:
1529            self.hilog_file_address.append(hilog_file_address)
1530            self.hdc_module_name = os.path.basename(os.path.dirname(hilog_file_address))
1531
1532    def remove_log_address(self, log_file_address, hilog_file_address):
1533        if log_file_address and log_file_address in self.log_file_address:
1534            self.log_file_address.remove(log_file_address)
1535        if hilog_file_address and hilog_file_address in self.hilog_file_address:
1536            self.hilog_file_address.remove(hilog_file_address)
1537
1538    def pull_extra_log_files(self, task_name: str, module_name: str, dirs: str, round_folder: str = ""):
1539        if not dirs or dirs == 'None':
1540            return
1541        extra_log_path = os.path.join(
1542            self.device.get_device_report_path(), "log", round_folder,
1543            module_name, "extra_log_{}".format(task_name))
1544        remotes = {}
1545        for item in dirs.split(';'):
1546            item = item.strip().rstrip('/')
1547            if not item:
1548                continue
1549            # 若是文件夹,则保存在本地的同名文件夹内
1550            on_folder = os.path.basename(item) if self.device.is_directory(item) else ""
1551            remotes.update({item: on_folder})
1552        self.get_period_log(remotes, extra_log_path)
1553
1554    def clear_device_logs(self):
1555        """清除设备侧日志"""
1556        warnings.warn('this function is no longer supported', DeprecationWarning)
1557
1558    def clear_kingking_dir_log(self):
1559        def execute_clear_cmd(path: str, prefix: list):
1560            for pre in prefix:
1561                clear_cmd = "rm -f {}/{}/*".format(path, pre)
1562                self.device.execute_shell_command(clear_cmd)
1563
1564        execute_clear_cmd(KINGKONG_PATH, ["data", "fault_route", "screenshots"])
1565
1566    def get_abnormal_hilog(self, local_hilog_path):
1567        warnings.warn('this function is no longer supported', DeprecationWarning)
1568
1569    def get_period_log(self, remotes: dict, local_path: str, begin_time: float = None, find_cmd: str = None):
1570        """在目录下查找一段时间内有更改的文件,并将文件拉到本地
1571        remotes: dict, {查找目录: 使用子文件夹存放文件(通常不用子文件夹)}
1572        local_path: str, pull to local path
1573        begin_time: float, the beginning time
1574        """
1575        begin = begin_time if begin_time else self._hilog_begin_time
1576        if not begin:
1577            LOG.warning('hilog task begin time is not set')
1578            return
1579        minutes, seconds = divmod(int(time.time() - begin), 60)
1580        if minutes < 0:
1581            LOG.warning('get logs in a period failed!')
1582            LOG.warning('当前日志打印的时间先与开始抓取日志的时间')
1583            return
1584        if minutes > 0:
1585            units = '%dm' % minutes
1586        else:
1587            units = '%ds' % seconds
1588
1589        for remote_dir, on_folder in remotes.items():
1590            find = find_cmd if find_cmd else 'find {}'.format(remote_dir)
1591            cmd = '{} -type f -mtime -{}'.format(find, units)
1592            out = self.device.execute_shell_command(cmd)
1593            if 'No such file or directory' in out:
1594                continue
1595            LOG.debug(out)
1596            log_files = [f for f in out.strip().replace('\r', '').split('\n') if f and f.startswith(remote_dir)]
1597            if not log_files:
1598                continue
1599            local_dir = os.path.join(local_path, on_folder) if on_folder else local_path
1600            os.makedirs(local_dir, exist_ok=True)
1601            os.chmod(local_dir, FilePermission.mode_755)
1602            for log_file in log_files:
1603                # 避免将整个文件夹拉下来和重复拉取文件
1604                if log_file == remote_dir and self.device.is_directory(log_file) \
1605                        or os.path.exists(log_file) and os.path.isfile(log_file):
1606                    continue
1607                self.device.pull_file(log_file, local_dir, retry=0)
1608
1609    def start_catch_log(self, request, **kwargs):
1610        hilog_size = kwargs.get("hilog_size", "10M")
1611        log_level = request.config.device_log.get(ConfigConst.tag_loglevel, "INFO")
1612        pull_hdc_log_status = request.config.device_log.get(ConfigConst.tag_hdc, None)
1613        self.need_pull_hdc_log = False if pull_hdc_log_status and pull_hdc_log_status.lower() == "false" else True
1614        self.device.set_device_report_path(request.config.report_path)
1615        self.start_hilog_task(log_size=hilog_size, log_level=log_level)
1616
1617    def stop_catch_log(self, request, **kwargs):
1618        self.remove_log_address(self.device_log.get(self.device.device_sn, None),
1619                                self.hilog.get(self.device.device_sn, None))
1620        serial = "{}_{}".format(str(self.device.__get_serial__()), time.time_ns())
1621        log_tar_file_name = "{}".format(str(serial).replace(":", "_"))
1622        self.stop_hilog_task(
1623            log_tar_file_name,
1624            module_name=request.get_module_name(),
1625            extras_dirs=request.config.device_log.get(ConfigConst.tag_dir),
1626            repeat=request.config.repeat,
1627            repeat_round=request.get_repeat_round())
1628