• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import platform
21import socket
22import struct
23import threading
24import time
25import shutil
26import stat
27from dataclasses import dataclass
28
29from xdevice import DeviceOsType
30from xdevice import ReportException
31from xdevice import ExecuteTerminate
32from xdevice import platform_logger
33from xdevice import Plugin
34from xdevice import get_plugin
35from xdevice import IShellReceiver
36from xdevice import exec_cmd
37from xdevice import FilePermission
38from xdevice import DeviceError
39from xdevice import HdcError
40from xdevice import HdcCommandRejectedException
41from xdevice import ShellCommandUnresponsiveException
42from xdevice import DeviceState
43from xdevice import convert_serial
44from xdevice import convert_mac
45from xdevice import is_proc_running
46from xdevice import convert_ip
47from xdevice import create_dir
48from ohos.error import ErrorMessage
49
50ID_OKAY = b'OKAY'
51ID_FAIL = b'FAIL'
52ID_STAT = b'STAT'
53ID_RECV = b'RECV'
54ID_DATA = b'DATA'
55ID_DONE = b'DONE'
56ID_SEND = b'SEND'
57ID_LIST = b'LIST'
58ID_DENT = b'DENT'
59
60DEFAULT_ENCODING = "utf-8"
61COMPATIBLE_ENCODING = "ISO-8859-1"
62SYNC_DATA_MAX = 64 * 1024
63REMOTE_PATH_MAX_LENGTH = 1024
64SOCK_DATA_MAX = 256
65
66INSTALL_TIMEOUT = 2 * 60 * 1000
67DEFAULT_TIMEOUT = 40 * 1000
68
69MAX_CONNECT_ATTEMPT_COUNT = 10
70DATA_UNIT_LENGTH = 4
71HEXADECIMAL_NUMBER = 16
72SPECIAL_FILE_MODE = 41471
73FORMAT_BYTES_LENGTH = 4
74DEFAULT_OFFSET_OF_INT = 4
75
76INVALID_MODE_CODE = -1
77DEFAULT_STD_PORT = 8710
78HDC_NAME = "hdc"
79HDC_STD_NAME = "hdc_std"
80LOG = platform_logger("Hdc")
81
82
83class HdcMonitor:
84    """
85    A Device monitor.
86    This monitor connects to the Device Connector, gets device and
87    debuggable process information from it.
88    """
89    MONITOR_MAP = {}
90
91    def __init__(self, host="127.0.0.1", port=None, device_connector=None):
92        self.channel = dict()
93        self.channel.setdefault("host", host)
94        self.channel.setdefault("port", port)
95        self.main_hdc_connection = None
96        self.connection_attempt = 0
97        self.is_stop = False
98        self.monitoring = False
99        self.server = device_connector
100        self.devices = []
101        self.last_msg_len = 0
102        self.changed = True
103
104    @staticmethod
105    def get_instance(host, port=None, device_connector=None):
106        if host not in HdcMonitor.MONITOR_MAP:
107            monitor = HdcMonitor(host, port, device_connector)
108            HdcMonitor.MONITOR_MAP[host] = monitor
109            LOG.debug("HdcMonitor map add host %s, map is %s" %
110                      (host, HdcMonitor.MONITOR_MAP))
111
112        return HdcMonitor.MONITOR_MAP[host]
113
114    def start(self):
115        """
116        Starts the monitoring.
117        """
118        try:
119            server_thread = threading.Thread(target=self.loop_monitor,
120                                             name="HdcMonitor", args=())
121            server_thread.daemon = True
122            server_thread.start()
123        except FileNotFoundError as _:
124            LOG.error("HdcMonitor can't find connector, init device "
125                      "environment failed!")
126
127    def init_hdc(self, connector_name=HDC_NAME):
128        env_hdc = shutil.which(connector_name)
129        # if not, add xdevice's own hdc path to environ path.
130        # tell if hdc has already been in the environ path.
131        if env_hdc is None:
132            self.is_stop = True
133            LOG.error("Can not find {} or {} environment variable, please set first!".format(HDC_NAME, HDC_STD_NAME))
134            LOG.error("Stop {} monitor!".format(HdcHelper.CONNECTOR_NAME))
135        if not is_proc_running(connector_name):
136            port = DEFAULT_STD_PORT
137            self.start_hdc(
138                connector=connector_name,
139                local_port=self.channel.setdefault(
140                    "port", port))
141            time.sleep(1)
142
143    def _init_hdc_connection(self):
144        if self.main_hdc_connection is not None:
145            return
146        # set all devices disconnect
147        devices = [item for item in self.devices]
148        devices.reverse()
149        for local_device1 in devices:
150            local_device1.device_state = DeviceState.OFFLINE
151            self.server.device_changed(local_device1)
152
153        connector_name = HDC_STD_NAME if HdcHelper.is_hdc_std() else HDC_NAME
154        self.init_hdc(connector_name)
155        self.connection_attempt = 0
156        self.monitoring = False
157        while self.main_hdc_connection is None:
158            self.main_hdc_connection = self.open_hdc_connection()
159            if self.main_hdc_connection is None:
160                self.connection_attempt += 1
161                if self.connection_attempt > MAX_CONNECT_ATTEMPT_COUNT:
162                    LOG.error(
163                        "HdcMonitor attempt %s, can't connect to hdc "
164                        "for Device List Monitoring" %
165                        str(self.connection_attempt))
166                    raise HdcError(ErrorMessage.Hdc.Code_0304001.format(
167                        self.channel.get("host"), str(self.channel.get("post"))))
168
169                LOG.debug(
170                    "HdcMonitor Connection attempts: %s" %
171                    str(self.connection_attempt))
172                time.sleep(2)
173
174    def stop(self):
175        """
176        Stops the monitoring.
177        """
178        for host in HdcMonitor.MONITOR_MAP:
179            LOG.debug("HdcMonitor stop host %s" % host)
180            monitor = HdcMonitor.MONITOR_MAP[host]
181            try:
182                monitor.is_stop = True
183                if monitor.main_hdc_connection is not None:
184                    monitor.main_hdc_connection.shutdown(2)
185                    monitor.main_hdc_connection.close()
186                    monitor.main_hdc_connection = None
187            except (socket.error, socket.gaierror, socket.timeout) as _:
188                LOG.error("HdcMonitor close socket exception")
189        HdcMonitor.MONITOR_MAP.clear()
190        LOG.debug("HdcMonitor {} monitor stop!".format(HdcHelper.CONNECTOR_NAME))
191        LOG.debug("HdcMonitor map is %s" % HdcMonitor.MONITOR_MAP)
192
193    def loop_monitor(self):
194        """
195        Monitors the devices. This connects to the Debug Bridge
196        """
197        LOG.debug("current connector name is %s" % HdcHelper.CONNECTOR_NAME)
198        while not self.is_stop:
199            try:
200                if self.main_hdc_connection is None:
201                    self._init_hdc_connection()
202                    if self.main_hdc_connection is not None:
203                        LOG.debug(
204                            "HdcMonitor Connected to hdc for device "
205                            "monitoring, main_hdc_connection is %s" %
206                            self.main_hdc_connection)
207
208                self.list_targets()
209                time.sleep(2)
210            except (HdcError, Exception) as _:
211                self.handle_exception_monitor_loop()
212                time.sleep(2)
213
214    def handle_exception_monitor_loop(self):
215        LOG.debug("Handle exception monitor loop: %s" %
216                  self.main_hdc_connection)
217        if self.main_hdc_connection is None:
218            return
219        LOG.debug("Handle exception monitor loop, main hdc connection closed, "
220                  "main hdc connection: %s" % self.main_hdc_connection)
221        self.main_hdc_connection.close()
222        self.main_hdc_connection = None
223
224    def _get_device_instance(self, items, os_type):
225        device = get_plugin(plugin_type=Plugin.DEVICE, plugin_id=os_type)[0]
226        device_instance = device.__class__()
227        device_instance.__set_serial__(items[0])
228        device_instance.host = self.channel.get("host")
229        device_instance.port = self.channel.get("port")
230        if self.changed:
231            if DeviceState.get_state(items[3]) == DeviceState.CONNECTED:
232                LOG.debug("Dmlib get device instance {} {} {}, status: {}".format(
233                    device_instance.device_sn, device_instance.host, device_instance.port, items[3]))
234            else:
235                LOG.debug("Dmlib ignore device instance {} {} {}, status: {}".format(
236                    device_instance.device_sn, device_instance.host, device_instance.port, items[3]))
237        device_instance.device_state = DeviceState.get_state(items[3])
238        return device_instance
239
240    def update_devices(self, param_array_list):
241        devices = [item for item in self.devices]
242        devices.reverse()
243        for local_device1 in devices:
244            k = 0
245            for local_device2 in param_array_list:
246                if local_device1.device_sn == local_device2.device_sn and \
247                        local_device1.device_os_type == \
248                        local_device2.device_os_type:
249                    k = 1
250                    if local_device1.device_state != \
251                            local_device2.device_state:
252                        local_device1.device_state = local_device2.device_state
253                        self.server.device_changed(local_device1)
254                    param_array_list.remove(local_device2)
255                    break
256
257            if k == 0:
258                self.devices.remove(local_device1)
259                self.server.device_disconnected(local_device1)
260        for local_device in param_array_list:
261            self.devices.append(local_device)
262            self.server.device_connected(local_device)
263
264    def open_hdc_connection(self):
265        """
266        Attempts to connect to the debug bridge server. Return a connect socket
267        if success, null otherwise.
268        """
269        try:
270            LOG.debug(
271                "HdcMonitor connecting to hdc for Device List Monitoring")
272            LOG.debug("HdcMonitor socket connection host: %s, port: %s" %
273                      (str(convert_ip(self.channel.get("host"))),
274                       str(int(self.channel.get("port")))))
275
276            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
277            sock.connect((self.channel.get("host"),
278                          int(self.channel.get("port"))))
279            return sock
280
281        except (socket.error, socket.gaierror, socket.timeout) as exception:
282            LOG.error("HdcMonitor hdc socket connection Error: %s, "
283                      "host is %s, port is %s" % (str(exception),
284                                                  self.channel.get("host"),
285                                                  self.channel.get("port")))
286            return None
287
288    def start_hdc(self, connector=HDC_NAME, kill=False, local_port=None):
289        """Starts the hdc host side server.
290
291        Args:
292            connector: connector type, like "hdc"
293            kill: if True, kill exist host side server
294            local_port: local port to start host side server
295
296        Returns:
297            None
298        """
299        if kill:
300            LOG.debug("HdcMonitor {} kill".format(connector))
301            exec_cmd([connector, "kill"])
302        LOG.debug("HdcMonitor {} start".format(connector))
303        exec_cmd(
304            [connector, "-l5", "start"],
305            error_print=False, redirect=True)
306
307    def list_targets(self):
308        if self.main_hdc_connection:
309            self.server.monitor_lock.acquire(timeout=1)
310            try:
311                self.monitoring_list_targets()
312                len_buf = HdcHelper.read(self.main_hdc_connection,
313                                         DATA_UNIT_LENGTH)
314                length = struct.unpack("!I", len_buf)[0]
315                if length >= 0:
316                    if self.last_msg_len != length:
317                        LOG.debug("had received length is: %s" % length)
318                        self.last_msg_len = length
319                        self.changed = True
320                    else:
321                        self.changed = False
322                    self.connection_attempt = 0
323                    self.process_incoming_target_data(length)
324            except Exception as e:
325                LOG.error(e)
326                raise e
327            finally:
328                self.server.monitor_lock.release()
329
330    def monitoring_list_targets(self):
331        try:
332            if not self.monitoring:
333                self.main_hdc_connection.settimeout(3)
334                HdcHelper.handle_shake(self.main_hdc_connection)
335                self.main_hdc_connection.settimeout(None)
336                request = HdcHelper.form_hdc_request("alive")
337                HdcHelper.write(self.main_hdc_connection, request)
338                self.monitoring = True
339        except socket.timeout as e:
340            LOG.error("HdcMonitor Connection handshake: error {}".format(e))
341            time.sleep(3)
342            raise HdcError(ErrorMessage.Hdc.Code_0304001.format(
343                self.channel.get("host"), self.channel.get("post"))) from e
344        request = HdcHelper.form_hdc_request('list targets -v')
345        HdcHelper.write(self.main_hdc_connection, request)
346
347    def process_incoming_target_data(self, length):
348        local_array_list = []
349        data_buf = HdcHelper.read(self.main_hdc_connection, length)
350        data_str = HdcHelper.reply_to_string(data_buf)
351        if 'Empty' not in data_str:
352            lines = data_str.split('\n')
353            for line in lines:
354                items = line.strip().split('\t')
355                # Example: sn    USB     Offline localhost       hdc
356                if not items[0] or len(items) < 5:
357                    continue
358                device_instance = self._get_device_instance(
359                    items, DeviceOsType.default)
360                local_array_list.append(device_instance)
361        else:
362            if self.changed:
363                LOG.debug("please check device actually.[%s]" % data_str.strip())
364        self.update_devices(local_array_list)
365
366    @staticmethod
367    def peek_hdc():
368        LOG.debug("Peek running process to check expect connector.")
369        # if not find hdc_std, find hdc
370        connector_name = HDC_NAME
371        env_hdc = shutil.which(connector_name)
372        # if not, add xdevice's own hdc path to environ path.
373        # tell if hdc has already been in the environ path.
374        if env_hdc is None:
375            connector_name = HDC_STD_NAME
376        LOG.debug("Peak end")
377        return connector_name
378
379
380@dataclass
381class HdcResponse:
382    """Response from HDC."""
383    okay = ID_OKAY  # first 4 bytes in response were "OKAY"?
384    message = ""  # diagnostic string if okay is false
385
386
387class SyncService:
388    """
389    Sync service class to push/pull to/from devices/emulators,
390    through the debug bridge.
391    """
392
393    def __init__(self, device, host=None, port=None):
394        self.device = device
395        self.host = host
396        self.port = port
397        self.sock = None
398
399    def open_sync(self, timeout=DEFAULT_TIMEOUT):
400        """
401        Opens the sync connection. This must be called before any calls to
402        push[File] / pull[File].
403        Return true if the connection opened, false if hdc refuse the
404        connection. This can happen device is invalid.
405        """
406        LOG.debug("Open sync, timeout=%s" % int(timeout / 1000))
407        self.sock = HdcHelper.socket(host=self.host, port=self.port,
408                                     timeout=timeout)
409        HdcHelper.set_device(self.device, self.sock)
410
411        request = HdcHelper.form_hdc_request("sync:")
412        HdcHelper.write(self.sock, request)
413
414        resp = HdcHelper.read_hdc_response(self.sock)
415        if not resp.okay:
416            err_msg = ErrorMessage.Hdc.Code_0304002.format(resp.message)
417            self.device.log.error(err_msg)
418            raise HdcError(err_msg)
419
420    def close(self):
421        """
422        Closes the connection.
423        """
424        if self.sock is not None:
425            try:
426                self.sock.close()
427            except socket.error as error:
428                LOG.error("Socket close error: %s" % error, error_no="00420")
429            finally:
430                self.sock = None
431
432    def pull_file(self, remote, local, is_create=False):
433        """
434        Pulls a file.
435        The top directory won't be created if is_create is False (by default)
436        and vice versa
437        """
438        mode = self.read_mode(remote)
439        self.device.log.debug("Remote file %s mode is %d" % (remote, mode))
440        if mode == 0:
441            raise HdcError(ErrorMessage.Device.Code_0303003.format(remote))
442
443        if str(mode).startswith("168"):
444            if is_create:
445                remote_file_split = os.path.split(remote)[-1] \
446                    if os.path.split(remote)[-1] else os.path.split(remote)[-2]
447                remote_file_basename = os.path.basename(remote_file_split)
448                new_local = os.path.join(local, remote_file_basename)
449                create_dir(new_local)
450            else:
451                new_local = local
452
453            collect_receiver = CollectingOutputReceiver()
454            HdcHelper.execute_shell_command(self.device, "ls %s" % remote,
455                                            receiver=collect_receiver)
456            files = collect_receiver.output.split()
457            for file_name in files:
458                self.pull_file("%s/%s" % (remote, file_name),
459                               new_local, is_create=True)
460        elif mode == SPECIAL_FILE_MODE:
461            self.device.log.info("skipping special file '%s'" % remote)
462        else:
463            if os.path.isdir(local):
464                local = os.path.join(local, os.path.basename(remote))
465
466            self.do_pull_file(remote, local)
467
468    def do_pull_file(self, remote, local):
469        """
470        Pulls a remote file
471        """
472        self.device.log.info(
473            "%s pull %s to %s" % (convert_serial(self.device.device_sn),
474                                  remote, local))
475        remote_path_content = remote.encode(DEFAULT_ENCODING)
476        if len(remote_path_content) > REMOTE_PATH_MAX_LENGTH:
477            raise HdcError(ErrorMessage.Hdc.Code_0304003)
478
479        msg = self.create_file_req(ID_RECV, remote_path_content)
480        HdcHelper.write(self.sock, msg)
481        pull_result = HdcHelper.read(self.sock, DATA_UNIT_LENGTH * 2)
482        if not self.check_result(pull_result, ID_DATA) and \
483                not self.check_result(pull_result, ID_DONE):
484            raise HdcError(self.read_error_message(pull_result))
485        if platform.system() == "Windows":
486            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY
487        else:
488            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND
489        pulled_file_open = os.open(local, flags, FilePermission.mode_755)
490        with os.fdopen(pulled_file_open, "wb") as pulled_file:
491            while True:
492                if self.check_result(pull_result, ID_DONE):
493                    break
494
495                if not self.check_result(pull_result, ID_DATA):
496                    raise HdcError(self.read_error_message(pull_result))
497
498                try:
499                    length = self.swap32bit_from_array(
500                        pull_result, DEFAULT_OFFSET_OF_INT)
501                except IndexError as index_error:
502                    self.device.log.debug("do_pull_file: %s" %
503                                          str(pull_result))
504                    if pull_result == ID_DATA:
505                        pull_result = self.sock.recv(DATA_UNIT_LENGTH)
506                        self.device.log.debug(
507                            "do_pull_file: %s" % str(pull_result))
508                        length = self.swap32bit_from_array(pull_result, 0)
509                        self.device.log.debug("do_pull_file: %s" % str(length))
510                    else:
511                        raise IndexError(str(index_error)) from index_error
512
513                if length > SYNC_DATA_MAX:
514                    raise HdcError(ErrorMessage.Hdc.Code_0304004)
515
516                pulled_file.write(HdcHelper.read(self.sock, length))
517                pulled_file.flush()
518                pull_result = self.sock.recv(DATA_UNIT_LENGTH * 2)
519
520    def push_file(self, local, remote, is_create=False):
521        """
522        Push a single file.
523        The top directory won't be created if is_create is False (by default)
524        and vice versa
525        """
526        if not os.path.exists(local):
527            raise HdcError(ErrorMessage.Device.Code_0303002.format(local))
528
529        if os.path.isdir(local):
530            if is_create:
531                local_file_split = os.path.split(local)[-1] \
532                    if os.path.split(local)[-1] else os.path.split(local)[-2]
533                local_file_basename = os.path.basename(local_file_split)
534                remote = "{}/{}".format(
535                    remote, local_file_basename)
536                HdcHelper.execute_shell_command(
537                    self.device, "mkdir -p %s" % remote)
538
539            for child in os.listdir(local):
540                file_path = os.path.join(local, child)
541                if os.path.isdir(file_path):
542                    self.push_file(
543                        file_path, "%s/%s" % (remote, child),
544                        is_create=False)
545                else:
546                    self.do_push_file(file_path, "%s/%s" % (remote, child))
547        else:
548            self.do_push_file(local, remote)
549
550    def do_push_file(self, local, remote):
551        """
552        Push a single file
553
554        Args:
555        ------------
556        local : string
557            the local file to push
558        remote : string
559            the remote file (length max is 1024)
560        """
561        mode = self.read_mode(remote)
562        self.device.log.debug("Remote file %s mode is %d" % (remote, mode))
563        self.device.log.debug("%s execute command: hdc push %s %s" % (
564            convert_serial(self.device.device_sn), local, remote))
565        if str(mode).startswith("168"):
566            remote = "%s/%s" % (remote, os.path.basename(local))
567
568        try:
569            try:
570                remote_path_content = remote.encode(DEFAULT_ENCODING)
571            except UnicodeEncodeError as _:
572                remote_path_content = remote.encode("UTF-8")
573            if len(remote_path_content) > REMOTE_PATH_MAX_LENGTH:
574                raise HdcError(ErrorMessage.Hdc.Code_0304003)
575
576            # create the header for the action
577            # and send it. We use a custom try/catch block to make the
578            # difference between file and network IO exceptions.
579            msg = self.create_send_file_req(ID_SEND, remote_path_content,
580                                            FilePermission.mode_644)
581
582            HdcHelper.write(self.sock, msg)
583            flags = os.O_RDONLY
584            modes = stat.S_IWUSR | stat.S_IRUSR
585            with os.fdopen(os.open(local, flags, modes), "rb") as test_file:
586                while True:
587                    if platform.system() == "Linux":
588                        data = test_file.read(1024 * 4)
589                    else:
590                        data = test_file.read(SYNC_DATA_MAX)
591
592                    if not data:
593                        break
594
595                    buf = struct.pack(
596                        "%ds%ds%ds" % (len(ID_DATA), FORMAT_BYTES_LENGTH,
597                                       len(data)), ID_DATA,
598                        self.swap32bits_to_bytes(len(data)), data)
599                    self.sock.send(buf)
600        except Exception as exception:
601            self.device.log.error("exception %s" % exception)
602            raise exception
603
604        msg = self.create_req(ID_DONE, int(time.time()))
605        HdcHelper.write(self.sock, msg)
606        result = HdcHelper.read(self.sock, DATA_UNIT_LENGTH * 2)
607        if not self.check_result(result, ID_OKAY):
608            self.device.log.error("exception %s" % result)
609            raise HdcError(self.read_error_message(result))
610
611    def read_mode(self, path):
612        """
613        Returns the mode of the remote file.
614        Return an Integer containing the mode if all went well or null
615        """
616        msg = self.create_file_req(ID_STAT, path)
617        HdcHelper.write(self.sock, msg)
618
619        # read the result, in a byte array containing 4 ints
620        stat_result = HdcHelper.read(self.sock, DATA_UNIT_LENGTH * 4)
621        if not self.check_result(stat_result, ID_STAT):
622            return INVALID_MODE_CODE
623
624        return self.swap32bit_from_array(stat_result, DEFAULT_OFFSET_OF_INT)
625
626    def create_file_req(self, command, path):
627        """
628        Creates the data array for a file request. This creates an array with a
629        4 byte command + the remote file name.
630
631        Args:
632        ------------
633        command :
634            the 4 byte command (ID_STAT, ID_RECV, ...)
635        path : string
636            The path, as a byte array, of the remote file on which to execute
637            the command.
638
639        return:
640        ------------
641            return the byte[] to send to the device through hdc
642        """
643        if isinstance(path, str):
644            try:
645                path = path.encode(DEFAULT_ENCODING)
646            except UnicodeEncodeError as _:
647                path = path.encode("UTF-8")
648
649        return struct.pack(
650            "%ds%ds%ds" % (len(command), FORMAT_BYTES_LENGTH, len(path)),
651            command, self.swap32bits_to_bytes(len(path)), path)
652
653    def create_send_file_req(self, command, path, mode=0o644):
654        # make the mode into a string
655        mode_str = ",%s" % str(mode & FilePermission.mode_777)
656        mode_content = mode_str.encode(DEFAULT_ENCODING)
657        return struct.pack(
658            "%ds%ds%ds%ds" % (len(command), FORMAT_BYTES_LENGTH, len(path),
659                              len(mode_content)),
660            command, self.swap32bits_to_bytes(len(path) + len(mode_content)),
661            path, mode_content)
662
663    def create_req(self, command, value):
664        """
665        Create a command with a code and an int values
666        """
667        return struct.pack("%ds%ds" % (len(command), FORMAT_BYTES_LENGTH),
668                           command, self.swap32bits_to_bytes(value))
669
670    @staticmethod
671    def check_result(result, code):
672        """
673        Checks the result array starts with the provided code
674
675        Args:
676        ------------
677        result :
678            the result array to check
679        path : string
680            the 4 byte code
681
682        return:
683        ------------
684        bool
685            return true if the code matches
686        """
687        return result[0:4] == code[0:4]
688
689    def read_error_message(self, result):
690        """
691        Reads an error message from the opened Socket.
692
693        Args:
694        ------------
695        result :
696            the current hdc result. Must contain both FAIL and the length of
697            the message.
698        """
699        if self.check_result(result, ID_FAIL):
700            length = self.swap32bit_from_array(result, 4)
701            if length > 0:
702                return str(HdcHelper.read(self.sock, length))
703
704        return None
705
706    @staticmethod
707    def swap32bits_to_bytes(value):
708        """
709        Swaps an unsigned value around, and puts the result in an bytes that
710        can be sent to a device.
711
712        Args:
713        ------------
714        value :
715            the value to swap.
716        """
717        return bytes([value & 0x000000FF,
718                      (value & 0x0000FF00) >> 8,
719                      (value & 0x00FF0000) >> 16,
720                      (value & 0xFF000000) >> 24])
721
722    @staticmethod
723    def swap32bit_from_array(value, offset):
724        """
725        Reads a signed 32 bit integer from an array coming from a device.
726
727        Args:
728        ------------
729        value :
730            the array containing the int
731        offset:
732            the offset in the array at which the int starts
733
734        Return:
735        ------------
736        int
737            the integer read from the array
738        """
739        result = 0
740        result |= (int(value[offset])) & 0x000000FF
741        result |= (int(value[offset + 1]) & 0x000000FF) << 8
742        result |= (int(value[offset + 2]) & 0x000000FF) << 16
743        result |= (int(value[offset + 3]) & 0x000000FF) << 24
744
745        return result
746
747
748class HdcHelper:
749    CONNECTOR_NAME = ""
750
751    @staticmethod
752    def check_if_hdc_running(timeout=30):
753        LOG.debug("Check if {} is running, timeout is {}s".format(
754            HdcHelper.CONNECTOR_NAME, timeout))
755        index = 1
756        while index < timeout:
757            if is_proc_running(HdcHelper.CONNECTOR_NAME):
758                return True
759            index = index + 1
760            time.sleep(1)
761        return False
762
763    @staticmethod
764    def push_file(device, local, remote, is_create=False,
765                  timeout=DEFAULT_TIMEOUT):
766        device.log.info("{} execute command: {} file send {} to {}".format(
767            convert_serial(device.device_sn), HdcHelper.CONNECTOR_NAME, local, remote))
768        HdcHelper._operator_file("file send", device, local, remote, timeout)
769
770    @staticmethod
771    def pull_file(device, remote, local, is_create=False,
772                  timeout=DEFAULT_TIMEOUT):
773        device.log.info("{} execute command: {} file recv {} to {}".format(
774            convert_serial(device.device_sn), HdcHelper.CONNECTOR_NAME, remote, local))
775        HdcHelper._operator_file("file recv", device, remote, local, timeout)
776
777    @staticmethod
778    def _install_remote_package(device, remote_file_path, command):
779        receiver = CollectingOutputReceiver()
780        cmd = "bm install -p %s %s" % (command.strip(), remote_file_path)
781        HdcHelper.execute_shell_command(device, cmd, INSTALL_TIMEOUT, receiver)
782        return receiver.output
783
784    @staticmethod
785    def install_package(device, package_file_path, command):
786        device.log.info("%s install %s" % (convert_serial(device.device_sn),
787                                           package_file_path))
788        remote_file_path = "/data/local/tmp/%s" % os.path.basename(
789            package_file_path)
790        HdcHelper.push_file(device, package_file_path, remote_file_path)
791        result = HdcHelper._install_remote_package(device, remote_file_path,
792                                                   command)
793        HdcHelper.execute_shell_command(device, "rm %s " % remote_file_path)
794        return result
795
796    @staticmethod
797    def uninstall_package(device, package_name):
798        receiver = CollectingOutputReceiver()
799        command = "bm uninstall -n %s " % package_name
800        device.log.info("%s %s" % (convert_serial(device.device_sn), command))
801        HdcHelper.execute_shell_command(device, command, INSTALL_TIMEOUT,
802                                        receiver)
803        return receiver.output
804
805    @staticmethod
806    def reboot(device, into=None):
807        device.log.info("{} execute command: {} target boot".format(convert_serial(device.device_sn),
808                                                                    HdcHelper.CONNECTOR_NAME))
809        with HdcHelper.socket(host=device.host, port=device.port) as sock:
810            HdcHelper.handle_shake(sock, device.device_sn)
811            request = HdcHelper.form_hdc_request("target boot")
812            HdcHelper.write(sock, request)
813        # 2024/7/4日出现新系统执行hdc target boot后不会马上重启情况,需要等待2s
814        time.sleep(2)
815
816    @staticmethod
817    def execute_shell_command(device, command, timeout=DEFAULT_TIMEOUT,
818                              receiver=None, **kwargs):
819        """
820        Executes a shell command on the device and retrieve the output.
821
822        Args:
823        ------------
824        device : IDevice
825            on which to execute the command.
826        command : string
827            the shell command to execute
828        timeout : int
829            max time between command output. If more time passes between
830            command output, the method will throw
831            ShellCommandUnresponsiveException (ms).
832        """
833        try:
834            if not timeout:
835                timeout = DEFAULT_TIMEOUT
836
837            with HdcHelper.socket(host=device.host, port=device.port,
838                                  timeout=timeout) as sock:
839                output_flag = kwargs.get("output_flag", True)
840                timeout_msg = " with timeout %ss" % str(timeout / 1000)
841                message = "{} execute command: {} shell {}{}".format(convert_serial(device.device_sn),
842                                                                     HdcHelper.CONNECTOR_NAME,
843                                                                     convert_mac(command), timeout_msg)
844                if output_flag:
845                    LOG.info(message)
846                else:
847                    LOG.debug(message)
848                from xdevice import Binder
849                HdcHelper.handle_shake(sock, device.device_sn)
850                request = HdcHelper.form_hdc_request("shell {}".format(command))
851                HdcHelper.write(sock, request)
852                resp = HdcResponse()
853                resp.okay = True
854                while True:
855                    len_buf = sock.recv(DATA_UNIT_LENGTH)
856                    if len_buf:
857                        length = struct.unpack("!I", len_buf)[0]
858                    else:
859                        break
860                    data = sock.recv(length)
861                    ret = HdcHelper.reply_to_string(data)
862                    if ret:
863                        if receiver:
864                            receiver.__read__(ret)
865                        else:
866                            LOG.debug(ret)
867                    if not Binder.is_executing():
868                        raise ExecuteTerminate()
869                return resp
870        except socket.timeout as error:
871            err_msg = ErrorMessage.Device.Code_0303013.format(
872                convert_serial(device.device_sn), convert_mac(command), str(timeout / 1000))
873            device.log.error(err_msg)
874            raise ShellCommandUnresponsiveException() from error
875        finally:
876            if receiver:
877                receiver.__done__()
878
879    @staticmethod
880    def set_device(device, sock):
881        """
882        Tells hdc to talk to a specific device
883        if the device is not -1, then we first tell hdc we're looking to talk
884        to a specific device
885        """
886        msg = "host:transport:%s" % device.device_sn
887        device_query = HdcHelper.form_hdc_request(msg)
888        HdcHelper.write(sock, device_query)
889        resp = HdcHelper.read_hdc_response(sock)
890        if not resp.okay:
891            raise HdcCommandRejectedException(ErrorMessage.Hdc.Code_0304005.format(resp.message))
892
893    @staticmethod
894    def form_hdc_request(req):
895        """
896        Create an ASCII string preceded by four hex digits.
897        """
898        try:
899            if not req.endswith('\0'):
900                req = "%s\0" % req
901            req = req.encode("utf-8")
902            fmt = "!I%ss" % len(req)
903            result = struct.pack(fmt, len(req), req)
904        except UnicodeEncodeError as ex:
905            LOG.error(ex)
906            raise ex
907        return result
908
909    @staticmethod
910    def read_hdc_response(sock, read_diag_string=False):
911        """
912        Reads the response from HDC after a command.
913
914        Args:
915        ------------
916        read_diag_string :
917            If true, we're expecting an OKAY response to be followed by a
918            diagnostic string. Otherwise, we only expect the diagnostic string
919            to follow a FAIL.
920        """
921        resp = HdcResponse()
922        reply = HdcHelper.read(sock, DATA_UNIT_LENGTH)
923        if HdcHelper.is_okay(reply):
924            resp.okay = True
925        else:
926            read_diag_string = True
927            resp.okay = False
928
929        while read_diag_string:
930            len_buf = HdcHelper.read(sock, DATA_UNIT_LENGTH)
931            len_str = HdcHelper.reply_to_string(len_buf)
932            msg = HdcHelper.read(sock, int(len_str, HEXADECIMAL_NUMBER))
933            resp.message = HdcHelper.reply_to_string(msg)
934            break
935
936        return resp
937
938    @staticmethod
939    def write(sock, req, timeout=10):
940        if isinstance(req, str):
941            req = req.encode(DEFAULT_ENCODING)
942        elif isinstance(req, list):
943            req = bytes(req)
944
945        start_time = time.time()
946        while req:
947            if time.time() - start_time > timeout:
948                LOG.debug("Socket write timeout, timeout:%ss" % timeout)
949                break
950
951            size = sock.send(req)
952            if size < 0:
953                raise DeviceError(ErrorMessage.Device.Code_0303017)
954
955            req = req[size:]
956            time.sleep(5 / 1000)
957
958    @staticmethod
959    def read(sock, length, timeout=10):
960        data = b''
961        recv_len = 0
962        start_time = time.time()
963        exc_num = 3
964        while length - recv_len > 0:
965            if time.time() - start_time > timeout:
966                LOG.debug("Socket read timeout, timout:%ss" % timeout)
967                break
968            try:
969                recv = sock.recv(length - recv_len)
970                if len(recv) > 0:
971                    time.sleep(5 / 1000)
972                else:
973                    break
974            except ConnectionResetError as error:
975                if exc_num <= 0:
976                    raise error
977                exc_num = exc_num - 1
978                recv = b''
979                time.sleep(1)
980                LOG.debug("ConnectionResetError occurs")
981
982            data += recv
983            recv_len += len(recv)
984
985        return data
986
987    @staticmethod
988    def is_okay(reply):
989        """
990        Checks to see if the first four bytes in "reply" are OKAY.
991        """
992        return reply[0:4] == ID_OKAY
993
994    @staticmethod
995    def reply_to_string(reply):
996        """
997        Converts an HDC reply to a string.
998        """
999        for encoding in [DEFAULT_ENCODING, COMPATIBLE_ENCODING]:
1000            try:
1001                return str(reply, encoding=encoding)
1002            except (ValueError, TypeError) as _:
1003                continue
1004        return ""
1005
1006    @staticmethod
1007    def socket(host=None, port=None, timeout=None):
1008        end = time.time() + 10 * 60
1009        sock = None
1010        hdc_connection = HdcMonitor.MONITOR_MAP.get(host, "127.0.0.1")
1011        while host not in HdcMonitor.MONITOR_MAP or \
1012                hdc_connection.main_hdc_connection is None:
1013            LOG.debug("Host: %s, port: %s, HdcMonitor map is %s" % (
1014                host, port, HdcMonitor.MONITOR_MAP))
1015            if host in HdcMonitor.MONITOR_MAP:
1016                LOG.debug("Monitor main hdc connection is %s" %
1017                          hdc_connection.main_hdc_connection)
1018            if time.time() > end:
1019                raise HdcError(ErrorMessage.Hdc.Code_0304006)
1020            time.sleep(2)
1021
1022        try:
1023            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1024            sock.connect((host, int(port)))
1025        except socket.error as exception:
1026            LOG.exception("Connect hdc server error: %s" % str(exception),
1027                          exc_info=False)
1028            raise exception
1029
1030        if sock is None:
1031            raise HdcError(ErrorMessage.Hdc.Code_0304007)
1032
1033        if timeout is not None:
1034            sock.setblocking(False)
1035            sock.settimeout(timeout / 1000)
1036
1037        return sock
1038
1039    @staticmethod
1040    def handle_shake(connection, connect_key=""):
1041        reply = HdcHelper.read(connection, 48)
1042        struct.unpack(">I12s32s", reply)
1043        banner_str = b'OHOS HDC'
1044        connect_key = connect_key.encode("utf-8")
1045        size = struct.calcsize('12s256s')
1046        fmt = "!I12s256s"
1047        pack_cmd = struct.pack(fmt, size, banner_str, connect_key)
1048        HdcHelper.write(connection, pack_cmd)
1049        return True
1050
1051    @staticmethod
1052    def _operator_file(command, device, local, remote, timeout):
1053        sock = HdcHelper.socket(host=device.host, port=device.port,
1054                                timeout=timeout)
1055        HdcHelper.handle_shake(sock, device.device_sn)
1056        request = HdcHelper.form_hdc_request(
1057            "%s %s %s" % (command, local, remote))
1058        HdcHelper.write(sock, request)
1059        reply = HdcHelper.read(sock, DATA_UNIT_LENGTH)
1060        length = struct.unpack("!I", reply)[0]
1061        data_buf = HdcHelper.read(sock, length)
1062        HdcHelper.reply_to_string(data_buf)
1063        LOG.debug(data_buf.decode().strip())
1064
1065    @staticmethod
1066    def is_hdc_std():
1067        return HDC_STD_NAME in HdcHelper.CONNECTOR_NAME
1068
1069
1070class DeviceConnector(object):
1071    __instance = None
1072    __init_flag = False
1073
1074    def __init__(self, host=None, port=None, usb_type=None):
1075        if DeviceConnector.__init_flag:
1076            return
1077        self.device_listeners = []
1078        self.device_monitor = None
1079        self.monitor_lock = threading.Condition()
1080        self.host = host if host else "127.0.0.1"
1081        self.usb_type = usb_type
1082        connector_name = HdcMonitor.peek_hdc()
1083        HdcHelper.CONNECTOR_NAME = connector_name
1084        if port:
1085            self.port = int(port)
1086        else:
1087            self.port = int(os.getenv("OHOS_HDC_SERVER_PORT", DEFAULT_STD_PORT))
1088
1089    def start(self):
1090        self.device_monitor = HdcMonitor.get_instance(
1091            self.host, self.port, device_connector=self)
1092        self.device_monitor.start()
1093
1094    def terminate(self):
1095        if self.device_monitor:
1096            self.device_monitor.stop()
1097        self.device_monitor = None
1098
1099    def add_device_change_listener(self, device_change_listener):
1100        self.device_listeners.append(device_change_listener)
1101
1102    def remove_device_change_listener(self, device_change_listener):
1103        if device_change_listener in self.device_listeners:
1104            self.device_listeners.remove(device_change_listener)
1105
1106    def device_connected(self, device):
1107        LOG.debug("DeviceConnector device connected:host %s, port %s, "
1108                  "device sn %s " % (self.host, self.port, device.device_sn))
1109        if device.host != self.host or device.port != self.port:
1110            LOG.debug("DeviceConnector device error")
1111        for listener in self.device_listeners:
1112            listener.device_connected(device)
1113
1114    def device_disconnected(self, device):
1115        LOG.debug("DeviceConnector device disconnected:host %s, port %s, "
1116                  "device sn %s" % (self.host, self.port, device.device_sn))
1117        if device.host != self.host or device.port != self.port:
1118            LOG.debug("DeviceConnector device error")
1119        for listener in self.device_listeners:
1120            listener.device_disconnected(device)
1121
1122    def device_changed(self, device):
1123        LOG.debug("DeviceConnector device changed:host %s, port %s, "
1124                  "device sn %s" % (self.host, self.port, device.device_sn))
1125        if device.host != self.host or device.port != self.port:
1126            LOG.debug("DeviceConnector device error")
1127        for listener in self.device_listeners:
1128            listener.device_changed(device)
1129
1130
1131class CollectingOutputReceiver(IShellReceiver):
1132    def __init__(self):
1133        self.output = ""
1134
1135    def __read__(self, output):
1136        self.output = "%s%s" % (self.output, output)
1137
1138    def __error__(self, message):
1139        pass
1140
1141    def __done__(self, result_code="", message=""):
1142        pass
1143
1144
1145class DisplayOutputReceiver(IShellReceiver):
1146    def __init__(self):
1147        self.output = ""
1148        self.unfinished_line = ""
1149
1150    def _process_output(self, output, end_mark="\n"):
1151        content = output
1152        if self.unfinished_line:
1153            content = "".join((self.unfinished_line, content))
1154            self.unfinished_line = ""
1155        lines = content.split(end_mark)
1156        if content.endswith(end_mark):
1157            # get rid of the tail element of this list contains empty str
1158            return lines[:-1]
1159        else:
1160            self.unfinished_line = lines[-1]
1161            # not return the tail element of this list contains unfinished str,
1162            # so we set position -1
1163            return lines[:-1]
1164
1165    def __read__(self, output):
1166        self.output = "%s%s" % (self.output, output)
1167        lines = self._process_output(output)
1168        for line in lines:
1169            line = line.strip()
1170            if line:
1171                LOG.info(line)
1172
1173    def __error__(self, message):
1174        pass
1175
1176    def __done__(self, result_code="", message=""):
1177        pass
1178
1179
1180def process_command_ret(ret, receiver):
1181    try:
1182        if ret != "" and receiver:
1183            receiver.__read__(ret)
1184            receiver.__done__()
1185    except Exception as error:
1186        LOG.exception(ErrorMessage.Common.Code_0301014, exc_info=False)
1187        raise ReportException() from error
1188
1189    if ret != "" and not receiver:
1190        lines = ret.split("\n")
1191        for line in lines:
1192            line = line.strip()
1193            if line:
1194                LOG.debug(line)
1195