• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import platform
21import socket
22import struct
23import threading
24import time
25import shutil
26import stat
27from dataclasses import dataclass
28
29from xdevice import DeviceOsType
30from xdevice import ReportException
31from xdevice import ExecuteTerminate
32from xdevice import platform_logger
33from xdevice import Plugin
34from xdevice import get_plugin
35from xdevice import IShellReceiver
36from xdevice import exec_cmd
37from xdevice import FilePermission
38from xdevice import DeviceError
39from xdevice import HdcError
40from xdevice import HdcCommandRejectedException
41from xdevice import ShellCommandUnresponsiveException
42from xdevice import DeviceState
43from xdevice import convert_serial
44from xdevice import convert_mac
45from xdevice import is_proc_running
46from xdevice import convert_ip
47from xdevice import create_dir
48from ohos.error import ErrorMessage
49
50ID_OKAY = b'OKAY'
51ID_FAIL = b'FAIL'
52ID_STAT = b'STAT'
53ID_RECV = b'RECV'
54ID_DATA = b'DATA'
55ID_DONE = b'DONE'
56ID_SEND = b'SEND'
57ID_LIST = b'LIST'
58ID_DENT = b'DENT'
59
60DEFAULT_ENCODING = "utf-8"
61COMPATIBLE_ENCODING = "ISO-8859-1"
62SYNC_DATA_MAX = 64 * 1024
63REMOTE_PATH_MAX_LENGTH = 1024
64SOCK_DATA_MAX = 256
65
66INSTALL_TIMEOUT = 2 * 60 * 1000
67DEFAULT_TIMEOUT = 40 * 1000
68
69MAX_CONNECT_ATTEMPT_COUNT = 10
70DATA_UNIT_LENGTH = 4
71HEXADECIMAL_NUMBER = 16
72SPECIAL_FILE_MODE = 41471
73FORMAT_BYTES_LENGTH = 4
74DEFAULT_OFFSET_OF_INT = 4
75
76INVALID_MODE_CODE = -1
77DEFAULT_STD_PORT = 8710
78HDC_NAME = "hdc"
79HDC_STD_NAME = "hdc_std"
80LOG = platform_logger("Hdc")
81
82
83class HdcMonitor:
84    """
85    A Device monitor.
86    This monitor connects to the Device Connector, gets device and
87    debuggable process information from it.
88    """
89    MONITOR_MAP = {}
90
91    def __init__(self, host="127.0.0.1", port=None, device_connector=None):
92        self.channel = dict()
93        self.channel.setdefault("host", host)
94        self.channel.setdefault("port", port)
95        self.main_hdc_connection = None
96        self.connection_attempt = 0
97        self.is_stop = False
98        self.monitoring = False
99        self.server = device_connector
100        self.devices = []
101        self.last_msg_len = 0
102        self.changed = True
103
104    @staticmethod
105    def get_instance(host, port=None, device_connector=None):
106        if host not in HdcMonitor.MONITOR_MAP:
107            monitor = HdcMonitor(host, port, device_connector)
108            HdcMonitor.MONITOR_MAP[host] = monitor
109            LOG.debug("HdcMonitor map add host %s, map is %s" %
110                      (host, HdcMonitor.MONITOR_MAP))
111
112        return HdcMonitor.MONITOR_MAP[host]
113
114    def start(self):
115        """
116        Starts the monitoring.
117        """
118        try:
119            server_thread = threading.Thread(target=self.loop_monitor,
120                                             name="HdcMonitor", args=())
121            server_thread.daemon = True
122            server_thread.start()
123        except FileNotFoundError as _:
124            LOG.error("HdcMonitor can't find connector, init device "
125                      "environment failed!")
126
127    def init_hdc(self, connector_name=HDC_NAME):
128        env_hdc = shutil.which(connector_name)
129        # if not, add xdevice's own hdc path to environ path.
130        # tell if hdc has already been in the environ path.
131        if env_hdc is None:
132            self.is_stop = True
133            LOG.error("Can not find {} or {} environment variable, please set first!".format(HDC_NAME, HDC_STD_NAME))
134            LOG.error("Stop {} monitor!".format(HdcHelper.CONNECTOR_NAME))
135        if not is_proc_running(connector_name):
136            port = DEFAULT_STD_PORT
137            self.start_hdc(
138                connector=connector_name,
139                local_port=self.channel.setdefault(
140                    "port", port))
141            time.sleep(1)
142
143    def _init_hdc_connection(self):
144        if self.main_hdc_connection is not None:
145            return
146        # set all devices disconnect
147        devices = [item for item in self.devices]
148        devices.reverse()
149        for local_device1 in devices:
150            local_device1.device_state = DeviceState.OFFLINE
151            self.server.device_changed(local_device1)
152
153        connector_name = HDC_STD_NAME if HdcHelper.is_hdc_std() else HDC_NAME
154        self.init_hdc(connector_name)
155        self.connection_attempt = 0
156        self.monitoring = False
157        while self.main_hdc_connection is None:
158            self.main_hdc_connection = self.open_hdc_connection()
159            if self.main_hdc_connection is None:
160                self.connection_attempt += 1
161                if self.connection_attempt > MAX_CONNECT_ATTEMPT_COUNT:
162                    LOG.error(
163                        "HdcMonitor attempt %s, can't connect to hdc "
164                        "for Device List Monitoring" %
165                        str(self.connection_attempt))
166                    raise HdcError(ErrorMessage.Hdc.Code_0304001.format(
167                        self.channel.get("host"), str(self.channel.get("port"))))
168
169                LOG.debug(
170                    "HdcMonitor Connection attempts: %s" %
171                    str(self.connection_attempt))
172                time.sleep(2)
173
174    def stop(self):
175        """
176        Stops the monitoring.
177        """
178        for host in HdcMonitor.MONITOR_MAP:
179            LOG.debug("HdcMonitor stop host %s" % host)
180            monitor = HdcMonitor.MONITOR_MAP[host]
181            try:
182                monitor.is_stop = True
183                if monitor.main_hdc_connection is not None:
184                    monitor.main_hdc_connection.shutdown(2)
185                    monitor.main_hdc_connection.close()
186                    monitor.main_hdc_connection = None
187            except (socket.error, socket.gaierror, socket.timeout) as _:
188                LOG.error("HdcMonitor close socket exception")
189        HdcMonitor.MONITOR_MAP.clear()
190        LOG.debug("HdcMonitor {} monitor stop!".format(HdcHelper.CONNECTOR_NAME))
191        LOG.debug("HdcMonitor map is %s" % HdcMonitor.MONITOR_MAP)
192
193    def loop_monitor(self):
194        """
195        Monitors the devices. This connects to the Debug Bridge
196        """
197        LOG.debug("current connector name is %s" % HdcHelper.CONNECTOR_NAME)
198        while not self.is_stop:
199            try:
200                if self.main_hdc_connection is None:
201                    self._init_hdc_connection()
202                    if self.main_hdc_connection is not None:
203                        LOG.debug(
204                            "HdcMonitor Connected to hdc for device "
205                            "monitoring, main_hdc_connection is %s" %
206                            self.main_hdc_connection)
207
208                self.list_targets()
209                time.sleep(2)
210            except (HdcError, Exception) as _:
211                self.handle_exception_monitor_loop()
212                time.sleep(2)
213
214    def handle_exception_monitor_loop(self):
215        LOG.debug("Handle exception monitor loop: %s" %
216                  self.main_hdc_connection)
217        if self.main_hdc_connection is None:
218            return
219        LOG.debug("Handle exception monitor loop, main hdc connection closed, "
220                  "main hdc connection: %s" % self.main_hdc_connection)
221        self.main_hdc_connection.close()
222        self.main_hdc_connection = None
223
224    def _get_device_instance(self, items, os_type):
225        device = get_plugin(plugin_type=Plugin.DEVICE, plugin_id=os_type)[0]
226        device_instance = device.__class__()
227        device_instance.__set_serial__(items[0])
228        device_instance.host = self.channel.get("host")
229        device_instance.port = self.channel.get("port")
230        if self.changed:
231            if DeviceState.get_state(items[3]) == DeviceState.CONNECTED:
232                LOG.debug("Dmlib get device instance {} {} {}, status: {}".format(
233                    device_instance.device_sn, device_instance.host, device_instance.port, items[3]))
234            else:
235                LOG.debug("Dmlib ignore device instance {} {} {}, status: {}".format(
236                    device_instance.device_sn, device_instance.host, device_instance.port, items[3]))
237        device_instance.device_state = DeviceState.get_state(items[3])
238        return device_instance
239
240    def update_devices(self, param_array_list):
241        devices = [item for item in self.devices]
242        devices.reverse()
243        for local_device1 in devices:
244            k = 0
245            for local_device2 in param_array_list:
246                if local_device1.device_sn == local_device2.device_sn and \
247                        local_device1.device_os_type == \
248                        local_device2.device_os_type:
249                    k = 1
250                    if local_device1.device_state != \
251                            local_device2.device_state:
252                        local_device1.device_state = local_device2.device_state
253                        self.server.device_changed(local_device1)
254                    param_array_list.remove(local_device2)
255                    break
256
257            if k == 0:
258                self.devices.remove(local_device1)
259                self.server.device_disconnected(local_device1)
260        for local_device in param_array_list:
261            self.devices.append(local_device)
262            self.server.device_connected(local_device)
263
264    def open_hdc_connection(self):
265        """
266        Attempts to connect to the debug bridge server. Return a connect socket
267        if success, null otherwise.
268        """
269        try:
270            LOG.debug(
271                "HdcMonitor connecting to hdc for Device List Monitoring")
272            LOG.debug("HdcMonitor socket connection host: %s, port: %s" %
273                      (str(convert_ip(self.channel.get("host"))),
274                       str(int(self.channel.get("port")))))
275
276            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
277            sock.connect((self.channel.get("host"),
278                          int(self.channel.get("port"))))
279            return sock
280
281        except (socket.error, socket.gaierror, socket.timeout) as exception:
282            LOG.error("HdcMonitor hdc socket connection Error: %s, "
283                      "host is %s, port is %s" % (str(exception),
284                                                  self.channel.get("host"),
285                                                  self.channel.get("port")))
286            return None
287
288    def start_hdc(self, connector=HDC_NAME, kill=False, local_port=None):
289        """Starts the hdc host side server.
290
291        Args:
292            connector: connector type, like "hdc"
293            kill: if True, kill exist host side server
294            local_port: local port to start host side server
295
296        Returns:
297            None
298        """
299        if kill:
300            LOG.debug("HdcMonitor {} kill".format(connector))
301            exec_cmd([connector, "kill"])
302        LOG.debug("HdcMonitor {} start".format(connector))
303        exec_cmd(
304            [connector, "-l5", "start"],
305            error_print=False, redirect=True)
306
307    def list_targets(self):
308        if self.main_hdc_connection:
309            self.server.monitor_lock.acquire(timeout=1)
310            try:
311                self.monitoring_list_targets()
312                len_buf = HdcHelper.read(self.main_hdc_connection,
313                                         DATA_UNIT_LENGTH)
314                length = struct.unpack("!I", len_buf)[0]
315                if length >= 0:
316                    if self.last_msg_len != length:
317                        LOG.debug("had received length is: %s" % length)
318                        self.last_msg_len = length
319                        self.changed = True
320                    else:
321                        self.changed = False
322                    self.connection_attempt = 0
323                    self.process_incoming_target_data(length)
324            except Exception as e:
325                LOG.error(e)
326                raise e
327            finally:
328                self.server.monitor_lock.release()
329
330    def monitoring_list_targets(self):
331        try:
332            if not self.monitoring:
333                self.main_hdc_connection.settimeout(3)
334                HdcHelper.handle_shake(self.main_hdc_connection)
335                self.main_hdc_connection.settimeout(None)
336                request = HdcHelper.form_hdc_request("alive")
337                HdcHelper.write(self.main_hdc_connection, request)
338                self.monitoring = True
339        except socket.timeout as e:
340            LOG.error("HdcMonitor Connection handshake: error {}".format(e))
341            time.sleep(3)
342            raise HdcError(ErrorMessage.Hdc.Code_0304001.format(
343                self.channel.get("host"), self.channel.get("port"))) from e
344        request = HdcHelper.form_hdc_request('list targets -v')
345        HdcHelper.write(self.main_hdc_connection, request)
346
347    def process_incoming_target_data(self, length):
348        local_array_list = []
349        data_buf = HdcHelper.read(self.main_hdc_connection, length)
350        data_str = HdcHelper.reply_to_string(data_buf)
351        if 'Empty' not in data_str:
352            lines = data_str.split('\n')
353            for line in lines:
354                items = line.strip().split('\t')
355                # Example: sn    USB     Offline localhost       hdc
356                if not items[0] or len(items) < 5:
357                    continue
358                device_instance = self._get_device_instance(
359                    items, DeviceOsType.default)
360                local_array_list.append(device_instance)
361        else:
362            if self.changed:
363                LOG.debug("please check device actually.[%s]" % data_str.strip())
364        self.update_devices(local_array_list)
365
366    @staticmethod
367    def peek_hdc():
368        LOG.debug("Peek running process to check expect connector.")
369        # if not find hdc_std, find hdc
370        connector_name = HDC_NAME
371        env_hdc = shutil.which(connector_name)
372        # if not, add xdevice's own hdc path to environ path.
373        # tell if hdc has already been in the environ path.
374        if env_hdc is None:
375            connector_name = HDC_STD_NAME
376        LOG.debug("Peak end")
377        return connector_name
378
379
380@dataclass
381class HdcResponse:
382    """Response from HDC."""
383    okay = ID_OKAY  # first 4 bytes in response were "OKAY"?
384    message = ""  # diagnostic string if okay is false
385
386
387class SyncService:
388    """
389    Sync service class to push/pull to/from devices/emulators,
390    through the debug bridge.
391    """
392
393    def __init__(self, device, host=None, port=None):
394        self.device = device
395        self.host = host
396        self.port = port
397        self.sock = None
398
399    def open_sync(self, timeout=DEFAULT_TIMEOUT):
400        """
401        Opens the sync connection. This must be called before any calls to
402        push[File] / pull[File].
403        Return true if the connection opened, false if hdc refuse the
404        connection. This can happen device is invalid.
405        """
406        LOG.debug("Open sync, timeout=%s" % int(timeout / 1000))
407        self.sock = HdcHelper.socket(host=self.host, port=self.port,
408                                     timeout=timeout)
409        HdcHelper.set_device(self.device, self.sock)
410
411        request = HdcHelper.form_hdc_request("sync:")
412        HdcHelper.write(self.sock, request)
413
414        resp = HdcHelper.read_hdc_response(self.sock)
415        if not resp.okay:
416            err_msg = ErrorMessage.Hdc.Code_0304002.format(resp.message)
417            self.device.log.error(err_msg)
418            raise HdcError(err_msg)
419
420    def close(self):
421        """
422        Closes the connection.
423        """
424        if self.sock is not None:
425            try:
426                self.sock.close()
427            except socket.error as error:
428                LOG.error("Socket close error: %s" % error, error_no="00420")
429            finally:
430                self.sock = None
431
432    def pull_file(self, remote, local, is_create=False):
433        """
434        Pulls a file.
435        The top directory won't be created if is_create is False (by default)
436        and vice versa
437        """
438        mode = self.read_mode(remote)
439        self.device.log.debug("Remote file %s mode is %d" % (remote, mode))
440        if mode == 0:
441            raise HdcError(ErrorMessage.Device.Code_0303003.format(remote))
442
443        if str(mode).startswith("168"):
444            if is_create:
445                remote_file_split = os.path.split(remote)[-1] \
446                    if os.path.split(remote)[-1] else os.path.split(remote)[-2]
447                remote_file_basename = os.path.basename(remote_file_split)
448                new_local = os.path.join(local, remote_file_basename)
449                create_dir(new_local)
450            else:
451                new_local = local
452
453            collect_receiver = CollectingOutputReceiver()
454            HdcHelper.execute_shell_command(self.device, "ls %s" % remote,
455                                            receiver=collect_receiver)
456            files = collect_receiver.output.split()
457            for file_name in files:
458                self.pull_file("%s/%s" % (remote, file_name),
459                               new_local, is_create=True)
460        elif mode == SPECIAL_FILE_MODE:
461            self.device.log.info("skipping special file '%s'" % remote)
462        else:
463            if os.path.isdir(local):
464                local = os.path.join(local, os.path.basename(remote))
465
466            self.do_pull_file(remote, local)
467
468    def do_pull_file(self, remote, local):
469        """
470        Pulls a remote file
471        """
472        self.device.log.info(
473            "%s pull %s to %s" % (convert_serial(self.device.device_sn),
474                                  remote, local))
475        remote_path_content = remote.encode(DEFAULT_ENCODING)
476        if len(remote_path_content) > REMOTE_PATH_MAX_LENGTH:
477            raise HdcError(ErrorMessage.Hdc.Code_0304003)
478
479        msg = self.create_file_req(ID_RECV, remote_path_content)
480        HdcHelper.write(self.sock, msg)
481        pull_result = HdcHelper.read(self.sock, DATA_UNIT_LENGTH * 2)
482        if not self.check_result(pull_result, ID_DATA) and \
483                not self.check_result(pull_result, ID_DONE):
484            raise HdcError(self.read_error_message(pull_result))
485        if platform.system() == "Windows":
486            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY
487        else:
488            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND
489        pulled_file_open = os.open(local, flags, FilePermission.mode_755)
490        with os.fdopen(pulled_file_open, "wb") as pulled_file:
491            while True:
492                if self.check_result(pull_result, ID_DONE):
493                    break
494
495                if not self.check_result(pull_result, ID_DATA):
496                    raise HdcError(self.read_error_message(pull_result))
497
498                try:
499                    length = self.swap32bit_from_array(
500                        pull_result, DEFAULT_OFFSET_OF_INT)
501                except IndexError as index_error:
502                    self.device.log.debug("do_pull_file: %s" %
503                                          str(pull_result))
504                    if pull_result == ID_DATA:
505                        pull_result = self.sock.recv(DATA_UNIT_LENGTH)
506                        self.device.log.debug(
507                            "do_pull_file: %s" % str(pull_result))
508                        length = self.swap32bit_from_array(pull_result, 0)
509                        self.device.log.debug("do_pull_file: %s" % str(length))
510                    else:
511                        raise IndexError(str(index_error)) from index_error
512
513                if length > SYNC_DATA_MAX:
514                    raise HdcError(ErrorMessage.Hdc.Code_0304004)
515
516                pulled_file.write(HdcHelper.read(self.sock, length))
517                pulled_file.flush()
518                pull_result = self.sock.recv(DATA_UNIT_LENGTH * 2)
519
520    def push_file(self, local, remote, is_create=False):
521        """
522        Push a single file.
523        The top directory won't be created if is_create is False (by default)
524        and vice versa
525        """
526        if not os.path.exists(local):
527            raise HdcError(ErrorMessage.Device.Code_0303002.format(local))
528
529        if os.path.isdir(local):
530            if is_create:
531                local_file_split = os.path.split(local)[-1] \
532                    if os.path.split(local)[-1] else os.path.split(local)[-2]
533                local_file_basename = os.path.basename(local_file_split)
534                remote = "{}/{}".format(
535                    remote, local_file_basename)
536                HdcHelper.execute_shell_command(
537                    self.device, "mkdir -p %s" % remote)
538
539            for child in os.listdir(local):
540                file_path = os.path.join(local, child)
541                if os.path.isdir(file_path):
542                    self.push_file(
543                        file_path, "%s/%s" % (remote, child),
544                        is_create=False)
545                else:
546                    self.do_push_file(file_path, "%s/%s" % (remote, child))
547        else:
548            self.do_push_file(local, remote)
549
550    def do_push_file(self, local, remote):
551        """
552        Push a single file
553
554        Args:
555        ------------
556        local : string
557            the local file to push
558        remote : string
559            the remote file (length max is 1024)
560        """
561        mode = self.read_mode(remote)
562        self.device.log.debug("Remote file %s mode is %d" % (remote, mode))
563        self.device.log.debug("%s execute command: hdc push %s %s" % (
564            convert_serial(self.device.device_sn), local, remote))
565        if str(mode).startswith("168"):
566            remote = "%s/%s" % (remote, os.path.basename(local))
567
568        try:
569            try:
570                remote_path_content = remote.encode(DEFAULT_ENCODING)
571            except UnicodeEncodeError as _:
572                remote_path_content = remote.encode("UTF-8")
573            if len(remote_path_content) > REMOTE_PATH_MAX_LENGTH:
574                raise HdcError(ErrorMessage.Hdc.Code_0304003)
575
576            # create the header for the action
577            # and send it. We use a custom try/catch block to make the
578            # difference between file and network IO exceptions.
579            msg = self.create_send_file_req(ID_SEND, remote_path_content,
580                                            FilePermission.mode_644)
581
582            HdcHelper.write(self.sock, msg)
583            flags = os.O_RDONLY
584            modes = stat.S_IWUSR | stat.S_IRUSR
585            with os.fdopen(os.open(local, flags, modes), "rb") as test_file:
586                while True:
587                    if platform.system() == "Linux":
588                        data = test_file.read(1024 * 4)
589                    else:
590                        data = test_file.read(SYNC_DATA_MAX)
591
592                    if not data:
593                        break
594
595                    buf = struct.pack(
596                        "%ds%ds%ds" % (len(ID_DATA), FORMAT_BYTES_LENGTH,
597                                       len(data)), ID_DATA,
598                        self.swap32bits_to_bytes(len(data)), data)
599                    self.sock.send(buf)
600        except Exception as exception:
601            self.device.log.error("exception %s" % exception)
602            raise exception
603
604        msg = self.create_req(ID_DONE, int(time.time()))
605        HdcHelper.write(self.sock, msg)
606        result = HdcHelper.read(self.sock, DATA_UNIT_LENGTH * 2)
607        if not self.check_result(result, ID_OKAY):
608            self.device.log.error("exception %s" % result)
609            raise HdcError(self.read_error_message(result))
610
611    def read_mode(self, path):
612        """
613        Returns the mode of the remote file.
614        Return an Integer containing the mode if all went well or null
615        """
616        msg = self.create_file_req(ID_STAT, path)
617        HdcHelper.write(self.sock, msg)
618
619        # read the result, in a byte array containing 4 ints
620        stat_result = HdcHelper.read(self.sock, DATA_UNIT_LENGTH * 4)
621        if not self.check_result(stat_result, ID_STAT):
622            return INVALID_MODE_CODE
623
624        return self.swap32bit_from_array(stat_result, DEFAULT_OFFSET_OF_INT)
625
626    def create_file_req(self, command, path):
627        """
628        Creates the data array for a file request. This creates an array with a
629        4 byte command + the remote file name.
630
631        Args:
632        ------------
633        command :
634            the 4 byte command (ID_STAT, ID_RECV, ...)
635        path : string
636            The path, as a byte array, of the remote file on which to execute
637            the command.
638
639        return:
640        ------------
641            return the byte[] to send to the device through hdc
642        """
643        if isinstance(path, str):
644            try:
645                path = path.encode(DEFAULT_ENCODING)
646            except UnicodeEncodeError as _:
647                path = path.encode("UTF-8")
648
649        return struct.pack(
650            "%ds%ds%ds" % (len(command), FORMAT_BYTES_LENGTH, len(path)),
651            command, self.swap32bits_to_bytes(len(path)), path)
652
653    def create_send_file_req(self, command, path, mode=0o644):
654        # make the mode into a string
655        mode_str = ",%s" % str(mode & FilePermission.mode_777)
656        mode_content = mode_str.encode(DEFAULT_ENCODING)
657        return struct.pack(
658            "%ds%ds%ds%ds" % (len(command), FORMAT_BYTES_LENGTH, len(path),
659                              len(mode_content)),
660            command, self.swap32bits_to_bytes(len(path) + len(mode_content)),
661            path, mode_content)
662
663    def create_req(self, command, value):
664        """
665        Create a command with a code and an int values
666        """
667        return struct.pack("%ds%ds" % (len(command), FORMAT_BYTES_LENGTH),
668                           command, self.swap32bits_to_bytes(value))
669
670    @staticmethod
671    def check_result(result, code):
672        """
673        Checks the result array starts with the provided code
674
675        Args:
676        ------------
677        result :
678            the result array to check
679        path : string
680            the 4 byte code
681
682        return:
683        ------------
684        bool
685            return true if the code matches
686        """
687        return result[0:4] == code[0:4]
688
689    def read_error_message(self, result):
690        """
691        Reads an error message from the opened Socket.
692
693        Args:
694        ------------
695        result :
696            the current hdc result. Must contain both FAIL and the length of
697            the message.
698        """
699        if self.check_result(result, ID_FAIL):
700            length = self.swap32bit_from_array(result, 4)
701            if length > 0:
702                return str(HdcHelper.read(self.sock, length))
703
704        return None
705
706    @staticmethod
707    def swap32bits_to_bytes(value):
708        """
709        Swaps an unsigned value around, and puts the result in an bytes that
710        can be sent to a device.
711
712        Args:
713        ------------
714        value :
715            the value to swap.
716        """
717        return bytes([value & 0x000000FF,
718                      (value & 0x0000FF00) >> 8,
719                      (value & 0x00FF0000) >> 16,
720                      (value & 0xFF000000) >> 24])
721
722    @staticmethod
723    def swap32bit_from_array(value, offset):
724        """
725        Reads a signed 32 bit integer from an array coming from a device.
726
727        Args:
728        ------------
729        value :
730            the array containing the int
731        offset:
732            the offset in the array at which the int starts
733
734        Return:
735        ------------
736        int
737            the integer read from the array
738        """
739        result = 0
740        result |= (int(value[offset])) & 0x000000FF
741        result |= (int(value[offset + 1]) & 0x000000FF) << 8
742        result |= (int(value[offset + 2]) & 0x000000FF) << 16
743        result |= (int(value[offset + 3]) & 0x000000FF) << 24
744
745        return result
746
747
748class HdcHelper:
749    CONNECTOR_NAME = ""
750
751    @staticmethod
752    def check_if_hdc_running(timeout=30):
753        LOG.debug("Check if {} is running, timeout is {}s".format(
754            HdcHelper.CONNECTOR_NAME, timeout))
755        index = 1
756        while index < timeout:
757            if is_proc_running(HdcHelper.CONNECTOR_NAME):
758                return True
759            index = index + 1
760            time.sleep(1)
761        return False
762
763    @staticmethod
764    def push_file(device, local, remote, is_create=False,
765                  timeout=DEFAULT_TIMEOUT):
766        device.log.info("{} execute command: {} file send {} to {}".format(
767            convert_serial(device.device_sn), HdcHelper.CONNECTOR_NAME, local, remote))
768        HdcHelper._operator_file("file send", device, local, remote, timeout)
769
770    @staticmethod
771    def pull_file(device, remote, local, is_create=False,
772                  timeout=DEFAULT_TIMEOUT):
773        device.log.info("{} execute command: {} file recv {} to {}".format(
774            convert_serial(device.device_sn), HdcHelper.CONNECTOR_NAME, remote, local))
775        HdcHelper._operator_file("file recv", device, remote, local, timeout)
776
777    @staticmethod
778    def _install_remote_package(device, remote_file_path, command):
779        receiver = CollectingOutputReceiver()
780        cmd = "bm install -p %s %s" % (command.strip(), remote_file_path)
781        HdcHelper.execute_shell_command(device, cmd, INSTALL_TIMEOUT, receiver)
782        return receiver.output
783
784    @staticmethod
785    def install_package(device, package_file_path, command):
786        device.log.info("%s install %s" % (convert_serial(device.device_sn),
787                                           package_file_path))
788        remote_file_path = "/data/local/tmp/%s" % os.path.basename(
789            package_file_path)
790        HdcHelper.push_file(device, package_file_path, remote_file_path)
791        result = HdcHelper._install_remote_package(device, remote_file_path,
792                                                   command)
793        HdcHelper.execute_shell_command(device, "rm %s " % remote_file_path)
794        return result
795
796    @staticmethod
797    def uninstall_package(device, package_name):
798        receiver = CollectingOutputReceiver()
799        command = "bm uninstall -n %s " % package_name
800        device.log.info("%s %s" % (convert_serial(device.device_sn), command))
801        HdcHelper.execute_shell_command(device, command, INSTALL_TIMEOUT,
802                                        receiver)
803        return receiver.output
804
805    @staticmethod
806    def reboot(device, into=None):
807        device.log.info("{} execute command: {} target boot".format(convert_serial(device.device_sn),
808                                                                    HdcHelper.CONNECTOR_NAME))
809        with HdcHelper.socket(host=device.host, port=device.port) as sock:
810            HdcHelper.handle_shake(sock, device.device_sn)
811            request = HdcHelper.form_hdc_request("target boot")
812            HdcHelper.write(sock, request)
813        # 2024/7/4日出现新系统执行hdc target boot后不会马上重启情况,需要等待2s
814        time.sleep(2)
815
816    @staticmethod
817    def execute_shell_command(device, command, timeout=DEFAULT_TIMEOUT,
818                              receiver=None, **kwargs):
819        """
820        Executes a shell command on the device and retrieve the output.
821
822        Args:
823        ------------
824        device : IDevice
825            on which to execute the command.
826        command : string
827            the shell command to execute
828        timeout : int
829            max time between command output. If more time passes between
830            command output, the method will throw
831            ShellCommandUnresponsiveException (ms).
832        """
833        try:
834            if not timeout:
835                timeout = DEFAULT_TIMEOUT
836
837            with HdcHelper.socket(host=device.host, port=device.port,
838                                  timeout=timeout) as sock:
839                output_flag = kwargs.get("output_flag", True)
840                timeout_msg = " with timeout %ss" % str(timeout / 1000)
841                message = "{} execute command: {} shell {}{}".format(convert_serial(device.device_sn),
842                                                                     HdcHelper.CONNECTOR_NAME,
843                                                                     convert_mac(command), timeout_msg)
844                if output_flag:
845                    LOG.info(message)
846                else:
847                    LOG.debug(message)
848                from xdevice import Binder
849                HdcHelper.handle_shake(sock, device.device_sn)
850                request = HdcHelper.form_hdc_request("shell {}".format(command))
851                HdcHelper.write(sock, request)
852                resp = HdcResponse()
853                resp.okay = True
854                while True:
855                    len_buf = sock.recv(DATA_UNIT_LENGTH)
856                    if len_buf:
857                        length = struct.unpack("!I", len_buf)[0]
858                    else:
859                        break
860                    data = sock.recv(length)
861                    recv_length = len(data)
862                    if recv_length < length:
863                        data += HdcHelper.read(sock, length - recv_length)
864                    ret = HdcHelper.reply_to_string(data)
865                    if ret:
866                        if receiver:
867                            receiver.__read__(ret)
868                        else:
869                            LOG.debug(ret)
870                    if not Binder.is_executing():
871                        raise ExecuteTerminate()
872                return resp
873        except socket.timeout as error:
874            err_msg = ErrorMessage.Device.Code_0303013.format(
875                convert_serial(device.device_sn), convert_mac(command), str(timeout / 1000))
876            device.log.error(err_msg)
877            raise ShellCommandUnresponsiveException() from error
878        finally:
879            if receiver:
880                receiver.__done__()
881
882    @staticmethod
883    def set_device(device, sock):
884        """
885        Tells hdc to talk to a specific device
886        if the device is not -1, then we first tell hdc we're looking to talk
887        to a specific device
888        """
889        msg = "host:transport:%s" % device.device_sn
890        device_query = HdcHelper.form_hdc_request(msg)
891        HdcHelper.write(sock, device_query)
892        resp = HdcHelper.read_hdc_response(sock)
893        if not resp.okay:
894            raise HdcCommandRejectedException(ErrorMessage.Hdc.Code_0304005.format(resp.message))
895
896    @staticmethod
897    def form_hdc_request(req):
898        """
899        Create an ASCII string preceded by four hex digits.
900        """
901        try:
902            if not req.endswith('\0'):
903                req = "%s\0" % req
904            req = req.encode("utf-8")
905            fmt = "!I%ss" % len(req)
906            result = struct.pack(fmt, len(req), req)
907        except UnicodeEncodeError as ex:
908            LOG.error(ex)
909            raise ex
910        return result
911
912    @staticmethod
913    def read_hdc_response(sock, read_diag_string=False):
914        """
915        Reads the response from HDC after a command.
916
917        Args:
918        ------------
919        read_diag_string :
920            If true, we're expecting an OKAY response to be followed by a
921            diagnostic string. Otherwise, we only expect the diagnostic string
922            to follow a FAIL.
923        """
924        resp = HdcResponse()
925        reply = HdcHelper.read(sock, DATA_UNIT_LENGTH)
926        if HdcHelper.is_okay(reply):
927            resp.okay = True
928        else:
929            read_diag_string = True
930            resp.okay = False
931
932        while read_diag_string:
933            len_buf = HdcHelper.read(sock, DATA_UNIT_LENGTH)
934            len_str = HdcHelper.reply_to_string(len_buf)
935            msg = HdcHelper.read(sock, int(len_str, HEXADECIMAL_NUMBER))
936            resp.message = HdcHelper.reply_to_string(msg)
937            break
938
939        return resp
940
941    @staticmethod
942    def write(sock, req, timeout=10):
943        if isinstance(req, str):
944            req = req.encode(DEFAULT_ENCODING)
945        elif isinstance(req, list):
946            req = bytes(req)
947
948        start_time = time.time()
949        while req:
950            if time.time() - start_time > timeout:
951                LOG.debug("Socket write timeout, timeout:%ss" % timeout)
952                break
953
954            size = sock.send(req)
955            if size < 0:
956                raise DeviceError(ErrorMessage.Device.Code_0303017)
957
958            req = req[size:]
959            time.sleep(5 / 1000)
960
961    @staticmethod
962    def read(sock, length, timeout=10):
963        data = b''
964        recv_len = 0
965        start_time = time.time()
966        exc_num = 3
967        while length - recv_len > 0:
968            if time.time() - start_time > timeout:
969                LOG.debug("Socket read timeout, timout:%ss" % timeout)
970                break
971            try:
972                recv = sock.recv(length - recv_len)
973                if len(recv) > 0:
974                    time.sleep(5 / 1000)
975                else:
976                    break
977            except ConnectionResetError as error:
978                if exc_num <= 0:
979                    raise error
980                exc_num = exc_num - 1
981                recv = b''
982                time.sleep(1)
983                LOG.debug("ConnectionResetError occurs")
984
985            data += recv
986            recv_len += len(recv)
987
988        return data
989
990    @staticmethod
991    def is_okay(reply):
992        """
993        Checks to see if the first four bytes in "reply" are OKAY.
994        """
995        return reply[0:4] == ID_OKAY
996
997    @staticmethod
998    def reply_to_string(reply):
999        """
1000        Converts an HDC reply to a string.
1001        """
1002        for encoding in [DEFAULT_ENCODING, COMPATIBLE_ENCODING]:
1003            try:
1004                return str(reply, encoding=encoding)
1005            except (ValueError, TypeError) as _:
1006                continue
1007        return ""
1008
1009    @staticmethod
1010    def socket(host=None, port=None, timeout=None):
1011        end = time.time() + 10 * 60
1012        sock = None
1013        hdc_connection = HdcMonitor.MONITOR_MAP.get(host, "127.0.0.1")
1014        while host not in HdcMonitor.MONITOR_MAP or \
1015                hdc_connection.main_hdc_connection is None:
1016            LOG.debug("Host: %s, port: %s, HdcMonitor map is %s" % (
1017                host, port, HdcMonitor.MONITOR_MAP))
1018            if host in HdcMonitor.MONITOR_MAP:
1019                LOG.debug("Monitor main hdc connection is %s" %
1020                          hdc_connection.main_hdc_connection)
1021            if time.time() > end:
1022                raise HdcError(ErrorMessage.Hdc.Code_0304006)
1023            time.sleep(2)
1024
1025        try:
1026            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1027            sock.connect((host, int(port)))
1028        except socket.error as exception:
1029            LOG.exception("Connect hdc server error: %s" % str(exception),
1030                          exc_info=False)
1031            raise exception
1032
1033        if sock is None:
1034            raise HdcError(ErrorMessage.Hdc.Code_0304007)
1035
1036        if timeout is not None:
1037            sock.setblocking(False)
1038            sock.settimeout(timeout / 1000)
1039
1040        return sock
1041
1042    @staticmethod
1043    def handle_shake(connection, connect_key=""):
1044        reply = HdcHelper.read(connection, 48)
1045        struct.unpack(">I12s32s", reply)
1046        banner_str = b'OHOS HDC'
1047        connect_key = connect_key.encode("utf-8")
1048        size = struct.calcsize('12s256s')
1049        fmt = "!I12s256s"
1050        pack_cmd = struct.pack(fmt, size, banner_str, connect_key)
1051        HdcHelper.write(connection, pack_cmd)
1052        return True
1053
1054    @staticmethod
1055    def _operator_file(command, device, local, remote, timeout):
1056        sock = HdcHelper.socket(host=device.host, port=device.port,
1057                                timeout=timeout)
1058        HdcHelper.handle_shake(sock, device.device_sn)
1059        request = HdcHelper.form_hdc_request(
1060            "%s %s %s" % (command, local, remote))
1061        HdcHelper.write(sock, request)
1062        reply = HdcHelper.read(sock, DATA_UNIT_LENGTH)
1063        length = struct.unpack("!I", reply)[0]
1064        data_buf = HdcHelper.read(sock, length)
1065        HdcHelper.reply_to_string(data_buf)
1066        LOG.debug(data_buf.decode().strip())
1067
1068    @staticmethod
1069    def is_hdc_std():
1070        return HDC_STD_NAME in HdcHelper.CONNECTOR_NAME
1071
1072
1073class DeviceConnector(object):
1074    __instance = None
1075    __init_flag = False
1076
1077    def __init__(self, host=None, port=None, usb_type=None):
1078        if DeviceConnector.__init_flag:
1079            return
1080        self.device_listeners = []
1081        self.device_monitor = None
1082        self.monitor_lock = threading.Condition()
1083        self.host = host if host else "127.0.0.1"
1084        self.usb_type = usb_type
1085        connector_name = HdcMonitor.peek_hdc()
1086        HdcHelper.CONNECTOR_NAME = connector_name
1087        if port:
1088            self.port = int(port)
1089        else:
1090            self.port = int(os.getenv("OHOS_HDC_SERVER_PORT", DEFAULT_STD_PORT))
1091
1092    def start(self):
1093        self.device_monitor = HdcMonitor.get_instance(
1094            self.host, self.port, device_connector=self)
1095        self.device_monitor.start()
1096
1097    def terminate(self):
1098        if self.device_monitor:
1099            self.device_monitor.stop()
1100        self.device_monitor = None
1101
1102    def add_device_change_listener(self, device_change_listener):
1103        self.device_listeners.append(device_change_listener)
1104
1105    def remove_device_change_listener(self, device_change_listener):
1106        if device_change_listener in self.device_listeners:
1107            self.device_listeners.remove(device_change_listener)
1108
1109    def device_connected(self, device):
1110        LOG.debug("DeviceConnector device connected:host %s, port %s, "
1111                  "device sn %s " % (self.host, self.port, device.device_sn))
1112        if device.host != self.host or device.port != self.port:
1113            LOG.debug("DeviceConnector device error")
1114        for listener in self.device_listeners:
1115            listener.device_connected(device)
1116
1117    def device_disconnected(self, device):
1118        LOG.debug("DeviceConnector device disconnected:host %s, port %s, "
1119                  "device sn %s" % (self.host, self.port, device.device_sn))
1120        if device.host != self.host or device.port != self.port:
1121            LOG.debug("DeviceConnector device error")
1122        for listener in self.device_listeners:
1123            listener.device_disconnected(device)
1124
1125    def device_changed(self, device):
1126        LOG.debug("DeviceConnector device changed:host %s, port %s, "
1127                  "device sn %s" % (self.host, self.port, device.device_sn))
1128        if device.host != self.host or device.port != self.port:
1129            LOG.debug("DeviceConnector device error")
1130        for listener in self.device_listeners:
1131            listener.device_changed(device)
1132
1133
1134class CollectingOutputReceiver(IShellReceiver):
1135    def __init__(self):
1136        self.output = ""
1137
1138    def __read__(self, output):
1139        self.output = "%s%s" % (self.output, output)
1140
1141    def __error__(self, message):
1142        pass
1143
1144    def __done__(self, result_code="", message=""):
1145        pass
1146
1147
1148class DisplayOutputReceiver(IShellReceiver):
1149    def __init__(self):
1150        self.output = ""
1151        self.unfinished_line = ""
1152
1153    def _process_output(self, output, end_mark="\n"):
1154        content = output
1155        if self.unfinished_line:
1156            content = "".join((self.unfinished_line, content))
1157            self.unfinished_line = ""
1158        lines = content.split(end_mark)
1159        if content.endswith(end_mark):
1160            # get rid of the tail element of this list contains empty str
1161            return lines[:-1]
1162        else:
1163            self.unfinished_line = lines[-1]
1164            # not return the tail element of this list contains unfinished str,
1165            # so we set position -1
1166            return lines[:-1]
1167
1168    def __read__(self, output):
1169        self.output = "%s%s" % (self.output, output)
1170        lines = self._process_output(output)
1171        for line in lines:
1172            line = line.strip()
1173            if line:
1174                LOG.info(line)
1175
1176    def __error__(self, message):
1177        pass
1178
1179    def __done__(self, result_code="", message=""):
1180        pass
1181
1182
1183def process_command_ret(ret, receiver):
1184    try:
1185        if ret != "" and receiver:
1186            receiver.__read__(ret)
1187            receiver.__done__()
1188    except Exception as error:
1189        LOG.exception(ErrorMessage.Common.Code_0301014, exc_info=False)
1190        raise ReportException() from error
1191
1192    if ret != "" and not receiver:
1193        lines = ret.split("\n")
1194        for line in lines:
1195            line = line.strip()
1196            if line:
1197                LOG.debug(line)
1198