• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import platform
21import socket
22import struct
23import threading
24import time
25import shutil
26import stat
27from dataclasses import dataclass
28
29from xdevice import DeviceOsType
30from xdevice import ReportException
31from xdevice import ExecuteTerminate
32from xdevice import platform_logger
33from xdevice import Plugin
34from xdevice import get_plugin
35from xdevice import IShellReceiver
36from xdevice import exec_cmd
37from xdevice import get_file_absolute_path
38from xdevice import FilePermission
39from xdevice import DeviceError
40from xdevice import HdcError
41from xdevice import HdcCommandRejectedException
42from xdevice import ShellCommandUnresponsiveException
43from xdevice import DeviceState
44from xdevice import convert_serial
45from xdevice import is_proc_running
46from xdevice import convert_ip
47from xdevice import create_dir
48
49ID_OKAY = b'OKAY'
50ID_FAIL = b'FAIL'
51ID_STAT = b'STAT'
52ID_RECV = b'RECV'
53ID_DATA = b'DATA'
54ID_DONE = b'DONE'
55ID_SEND = b'SEND'
56ID_LIST = b'LIST'
57ID_DENT = b'DENT'
58
59DEFAULT_ENCODING = "ISO-8859-1"
60SYNC_DATA_MAX = 64 * 1024
61REMOTE_PATH_MAX_LENGTH = 1024
62SOCK_DATA_MAX = 256
63
64INSTALL_TIMEOUT = 2 * 60 * 1000
65DEFAULT_TIMEOUT = 40 * 1000
66
67MAX_CONNECT_ATTEMPT_COUNT = 10
68DATA_UNIT_LENGTH = 4
69HEXADECIMAL_NUMBER = 16
70SPECIAL_FILE_MODE = 41471
71FORMAT_BYTES_LENGTH = 4
72DEFAULT_OFFSET_OF_INT = 4
73
74INVALID_MODE_CODE = -1
75DEFAULT_STD_PORT = 8710
76HDC_NAME = "hdc"
77HDC_STD_NAME = "hdc_std"
78LOG = platform_logger("Hdc")
79
80
81class HdcMonitor:
82    """
83    A Device monitor.
84    This monitor connects to the Device Connector, gets device and
85    debuggable process information from it.
86    """
87    MONITOR_MAP = {}
88
89    def __init__(self, host="127.0.0.1", port=None, device_connector=None):
90        self.channel = dict()
91        self.channel.setdefault("host", host)
92        self.channel.setdefault("port", port)
93        self.main_hdc_connection = None
94        self.connection_attempt = 0
95        self.is_stop = False
96        self.monitoring = False
97        self.server = device_connector
98        self.devices = []
99        self.last_msg_len = 0
100        self.changed = True
101
102    @staticmethod
103    def get_instance(host, port=None, device_connector=None):
104        if host not in HdcMonitor.MONITOR_MAP:
105            monitor = HdcMonitor(host, port, device_connector)
106            HdcMonitor.MONITOR_MAP[host] = monitor
107            LOG.debug("HdcMonitor map add host %s, map is %s" %
108                      (host, HdcMonitor.MONITOR_MAP))
109
110        return HdcMonitor.MONITOR_MAP[host]
111
112    def start(self):
113        """
114        Starts the monitoring.
115        """
116        try:
117            server_thread = threading.Thread(target=self.loop_monitor,
118                                             name="HdcMonitor", args=())
119            server_thread.setDaemon(True)
120            server_thread.start()
121        except FileNotFoundError as _:
122            LOG.error("HdcMonitor can't find connector, init device "
123                      "environment failed!")
124
125    def init_hdc(self, connector_name=HDC_NAME):
126        env_hdc = shutil.which(connector_name)
127        # if not, add xdevice's own hdc path to environ path.
128        # tell if hdc has already been in the environ path.
129        if env_hdc is None:
130            LOG.error("Can not find {} or {} environment variable, "
131                      "please set it first!".format(HDC_NAME, HDC_STD_NAME))
132        if not is_proc_running(connector_name):
133            port = DEFAULT_STD_PORT
134            self.start_hdc(
135                connector=connector_name,
136                local_port=self.channel.setdefault(
137                    "port", port))
138            time.sleep(1)
139
140    def _init_hdc_connection(self):
141        if self.main_hdc_connection is not None:
142            return
143        # set all devices disconnect
144        devices = [item for item in self.devices]
145        devices.reverse()
146        for local_device1 in devices:
147            local_device1.device_state = DeviceState.OFFLINE
148            self.server.device_changed(local_device1)
149
150        connector_name = HDC_STD_NAME if HdcHelper.is_hdc_std() else HDC_NAME
151        self.init_hdc(connector_name)
152        self.connection_attempt = 0
153        self.monitoring = False
154        while self.main_hdc_connection is None:
155            self.main_hdc_connection = self.open_hdc_connection()
156            if self.main_hdc_connection is None:
157                self.connection_attempt += 1
158                if self.connection_attempt > MAX_CONNECT_ATTEMPT_COUNT:
159                    self.is_stop = True
160                    LOG.error(
161                        "HdcMonitor attempt %s, can't connect to hdc "
162                        "for Device List Monitoring" %
163                        str(self.connection_attempt))
164                    raise HdcError(
165                        "HdcMonitor cannot connect hdc server(%s %s),"
166                        " please check!" %
167                        (self.channel.get("host"),
168                         str(self.channel.get("post"))))
169
170                LOG.debug(
171                    "HdcMonitor Connection attempts: %s" %
172                    str(self.connection_attempt))
173                time.sleep(2)
174
175    def stop(self):
176        """
177        Stops the monitoring.
178        """
179        for host in HdcMonitor.MONITOR_MAP:
180            LOG.debug("HdcMonitor stop host %s" % host)
181            monitor = HdcMonitor.MONITOR_MAP[host]
182            try:
183                monitor.is_stop = True
184                if monitor.main_hdc_connection is not None:
185                    monitor.main_hdc_connection.shutdown(2)
186                    monitor.main_hdc_connection.close()
187                    monitor.main_hdc_connection = None
188            except (socket.error, socket.gaierror, socket.timeout) as _:
189                LOG.error("HdcMonitor close socket exception")
190        HdcMonitor.MONITOR_MAP.clear()
191        LOG.debug("HdcMonitor {} monitor stop!".format(HdcHelper.CONNECTOR_NAME))
192        LOG.debug("HdcMonitor map is %s" % HdcMonitor.MONITOR_MAP)
193
194    def loop_monitor(self):
195        """
196        Monitors the devices. This connects to the Debug Bridge
197        """
198        LOG.debug("current connector name is %s" % HdcHelper.CONNECTOR_NAME)
199        while not self.is_stop:
200            try:
201                if self.main_hdc_connection is None:
202                    self._init_hdc_connection()
203                    if self.main_hdc_connection is not None:
204                        LOG.debug(
205                            "HdcMonitor Connected to hdc for device "
206                            "monitoring, main_hdc_connection is %s" %
207                            self.main_hdc_connection)
208
209                self.list_targets()
210                time.sleep(1)
211            except (HdcError, Exception) as _:
212                self.handle_exception_monitor_loop()
213                time.sleep(2)
214
215    def handle_exception_monitor_loop(self):
216        LOG.debug("Handle exception monitor loop: %s" %
217                  self.main_hdc_connection)
218        if self.main_hdc_connection is None:
219            return
220        LOG.debug("Handle exception monitor loop, main hdc connection closed, "
221                  "main hdc connection: %s" % self.main_hdc_connection)
222        self.main_hdc_connection.close()
223        self.main_hdc_connection = None
224
225    def _get_device_instance(self, items, os_type):
226        device = get_plugin(plugin_type=Plugin.DEVICE, plugin_id=os_type)[0]
227        device_instance = device.__class__()
228        device_instance.__set_serial__(items[0])
229        device_instance.host = self.channel.get("host")
230        device_instance.port = self.channel.get("port")
231        if self.changed:
232            if DeviceState.get_state(items[3]) == DeviceState.CONNECTED:
233                LOG.debug("Dmlib get device instance {} {} {}, status: {}".format(
234                    device_instance.device_sn, device_instance.host, device_instance.port, items[3]))
235            else:
236                LOG.debug("Dmlib ignore device instance {} {} {}, status: {}".format(
237                    device_instance.device_sn, device_instance.host, device_instance.port, items[3]))
238        device_instance.device_state = DeviceState.get_state(items[3])
239        return device_instance
240
241    def update_devices(self, param_array_list):
242        devices = [item for item in self.devices]
243        devices.reverse()
244        for local_device1 in devices:
245            k = 0
246            for local_device2 in param_array_list:
247                if local_device1.device_sn == local_device2.device_sn and \
248                        local_device1.device_os_type == \
249                        local_device2.device_os_type:
250                    k = 1
251                    if local_device1.device_state != \
252                            local_device2.device_state:
253                        local_device1.device_state = local_device2.device_state
254                        self.server.device_changed(local_device1)
255                    param_array_list.remove(local_device2)
256                    break
257
258            if k == 0:
259                self.devices.remove(local_device1)
260                self.server.device_disconnected(local_device1)
261        for local_device in param_array_list:
262            self.devices.append(local_device)
263            self.server.device_connected(local_device)
264
265    def open_hdc_connection(self):
266        """
267        Attempts to connect to the debug bridge server. Return a connect socket
268        if success, null otherwise.
269        """
270        sock = None
271        try:
272            LOG.debug(
273                "HdcMonitor connecting to hdc for Device List Monitoring")
274            LOG.debug("HdcMonitor socket connection host: %s, port: %s" %
275                      (str(convert_ip(self.channel.get("host"))),
276                       str(int(self.channel.get("port")))))
277
278            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
279            sock.connect((self.channel.get("host"),
280                          int(self.channel.get("port"))))
281            return sock
282
283        except (socket.error, socket.gaierror, socket.timeout) as exception:
284            LOG.error("HdcMonitor hdc socket connection Error: %s, "
285                      "host is %s, port is %s" % (str(exception),
286                                                  self.channel.get("host"),
287                                                  self.channel.get("port")))
288            return None
289
290    def start_hdc(self, connector=HDC_NAME, kill=False, local_port=None):
291        """Starts the hdc host side server.
292
293        Args:
294            connector: connector type, like "hdc"
295            kill: if True, kill exist host side server
296            local_port: local port to start host side server
297
298        Returns:
299            None
300        """
301        if kill:
302            LOG.debug("HdcMonitor {} kill".format(connector))
303            exec_cmd([connector, "kill"])
304        LOG.debug("HdcMonitor {} start".format(connector))
305        exec_cmd(
306            [connector, "-l5", "start"],
307            error_print=False, redirect=True)
308
309    def list_targets(self):
310        if self.main_hdc_connection:
311            self.server.monitor_lock.acquire(timeout=1)
312            try:
313                self.monitoring_list_targets()
314                len_buf = HdcHelper.read(self.main_hdc_connection,
315                                         DATA_UNIT_LENGTH)
316                length = struct.unpack("!I", len_buf)[0]
317                if length >= 0:
318                    if self.last_msg_len != length:
319                        LOG.debug("had received length is: %s" % length)
320                        self.last_msg_len = length
321                        self.changed = True
322                    else:
323                        self.changed = False
324                    self.connection_attempt = 0
325                    self.process_incoming_target_data(length)
326            except Exception as e:
327                LOG.error(e)
328                raise e
329            finally:
330                self.server.monitor_lock.release()
331
332    def monitoring_list_targets(self):
333        if not self.monitoring:
334            HdcHelper.handle_shake(self.main_hdc_connection)
335            request = HdcHelper.form_hdc_request("alive")
336            HdcHelper.write(self.main_hdc_connection, request)
337            self.monitoring = True
338        request = HdcHelper.form_hdc_request('list targets -v')
339        HdcHelper.write(self.main_hdc_connection, request)
340
341    def process_incoming_target_data(self, length):
342        local_array_list = []
343        data_buf = HdcHelper.read(self.main_hdc_connection, length)
344        data_str = HdcHelper.reply_to_string(data_buf)
345        if 'Empty' not in data_str:
346            lines = data_str.split('\n')
347            for line in lines:
348                items = line.strip().split('\t')
349                # Example: sn    USB     Offline localhost       hdc
350                if not items[0] or len(items) < 5:
351                    continue
352                device_instance = self._get_device_instance(
353                    items, DeviceOsType.default)
354                local_array_list.append(device_instance)
355        else:
356            if self.changed:
357                LOG.debug("please check device actually.[%s]" % data_str.strip())
358        self.update_devices(local_array_list)
359
360    @staticmethod
361    def peek_hdc():
362        LOG.debug("Peek running process to check expect connector.")
363        # if not find hdc_std, try find hdc
364        connector_name = HDC_STD_NAME
365        env_hdc = shutil.which(connector_name)
366        # if not, add xdevice's own hdc path to environ path.
367        # tell if hdc has already been in the environ path.
368        if env_hdc is None:
369            connector_name = HDC_NAME
370        LOG.debug("Peak end")
371        return connector_name
372
373
374@dataclass
375class HdcResponse:
376    """Response from HDC."""
377    okay = ID_OKAY  # first 4 bytes in response were "OKAY"?
378    message = ""  # diagnostic string if okay is false
379
380
381class SyncService:
382    """
383    Sync service class to push/pull to/from devices/emulators,
384    through the debug bridge.
385    """
386
387    def __init__(self, device, host=None, port=None):
388        self.device = device
389        self.host = host
390        self.port = port
391        self.sock = None
392
393    def open_sync(self, timeout=DEFAULT_TIMEOUT):
394        """
395        Opens the sync connection. This must be called before any calls to
396        push[File] / pull[File].
397        Return true if the connection opened, false if hdc refuse the
398        connection. This can happen device is invalid.
399        """
400        LOG.debug("Open sync, timeout=%s" % int(timeout / 1000))
401        self.sock = HdcHelper.socket(host=self.host, port=self.port,
402                                     timeout=timeout)
403        HdcHelper.set_device(self.device, self.sock)
404
405        request = HdcHelper.form_hdc_request("sync:")
406        HdcHelper.write(self.sock, request)
407
408        resp = HdcHelper.read_hdc_response(self.sock)
409        if not resp.okay:
410            self.device.log.error(
411                "Got unhappy response from HDC sync req: %s" % resp.message)
412            raise HdcError(
413                "Got unhappy response from HDC sync req: %s" % resp.message)
414
415    def close(self):
416        """
417        Closes the connection.
418        """
419        if self.sock is not None:
420            try:
421                self.sock.close()
422            except socket.error as error:
423                LOG.error("Socket close error: %s" % error, error_no="00420")
424            finally:
425                self.sock = None
426
427    def pull_file(self, remote, local, is_create=False):
428        """
429        Pulls a file.
430        The top directory won't be created if is_create is False (by default)
431        and vice versa
432        """
433        mode = self.read_mode(remote)
434        self.device.log.debug("Remote file %s mode is %d" % (remote, mode))
435        if mode == 0:
436            raise HdcError("Remote object doesn't exist!")
437
438        if str(mode).startswith("168"):
439            if is_create:
440                remote_file_split = os.path.split(remote)[-1] \
441                    if os.path.split(remote)[-1] else os.path.split(remote)[-2]
442                remote_file_basename = os.path.basename(remote_file_split)
443                new_local = os.path.join(local, remote_file_basename)
444                create_dir(new_local)
445            else:
446                new_local = local
447
448            collect_receiver = CollectingOutputReceiver()
449            HdcHelper.execute_shell_command(self.device, "ls %s" % remote,
450                                            receiver=collect_receiver)
451            files = collect_receiver.output.split()
452            for file_name in files:
453                self.pull_file("%s/%s" % (remote, file_name),
454                               new_local, is_create=True)
455        elif mode == SPECIAL_FILE_MODE:
456            self.device.log.info("skipping special file '%s'" % remote)
457        else:
458            if os.path.isdir(local):
459                local = os.path.join(local, os.path.basename(remote))
460
461            self.do_pull_file(remote, local)
462
463    def do_pull_file(self, remote, local):
464        """
465        Pulls a remote file
466        """
467        self.device.log.info(
468            "%s pull %s to %s" % (convert_serial(self.device.device_sn),
469                                  remote, local))
470        remote_path_content = remote.encode(DEFAULT_ENCODING)
471        if len(remote_path_content) > REMOTE_PATH_MAX_LENGTH:
472            raise HdcError("Remote path is too long.")
473
474        msg = self.create_file_req(ID_RECV, remote_path_content)
475        HdcHelper.write(self.sock, msg)
476        pull_result = HdcHelper.read(self.sock, DATA_UNIT_LENGTH * 2)
477        if not self.check_result(pull_result, ID_DATA) and \
478                not self.check_result(pull_result, ID_DONE):
479            raise HdcError(self.read_error_message(pull_result))
480        if platform.system() == "Windows":
481            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY
482        else:
483            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND
484        pulled_file_open = os.open(local, flags, FilePermission.mode_755)
485        with os.fdopen(pulled_file_open, "wb") as pulled_file:
486            while True:
487                if self.check_result(pull_result, ID_DONE):
488                    break
489
490                if not self.check_result(pull_result, ID_DATA):
491                    raise HdcError(self.read_error_message(pull_result))
492
493                try:
494                    length = self.swap32bit_from_array(
495                        pull_result, DEFAULT_OFFSET_OF_INT)
496                except IndexError as index_error:
497                    self.device.log.debug("do_pull_file: %s" %
498                                          str(pull_result))
499                    if pull_result == ID_DATA:
500                        pull_result = self.sock.recv(DATA_UNIT_LENGTH)
501                        self.device.log.debug(
502                            "do_pull_file: %s" % str(pull_result))
503                        length = self.swap32bit_from_array(pull_result, 0)
504                        self.device.log.debug("do_pull_file: %s" % str(length))
505                    else:
506                        raise IndexError(str(index_error)) from index_error
507
508                if length > SYNC_DATA_MAX:
509                    raise HdcError("Receiving too much data.")
510
511                pulled_file.write(HdcHelper.read(self.sock, length))
512                pulled_file.flush()
513                pull_result = self.sock.recv(DATA_UNIT_LENGTH * 2)
514
515    def push_file(self, local, remote, is_create=False):
516        """
517        Push a single file.
518        The top directory won't be created if is_create is False (by default)
519        and vice versa
520        """
521        if not os.path.exists(local):
522            raise HdcError("Local path doesn't exist.")
523
524        if os.path.isdir(local):
525            if is_create:
526                local_file_split = os.path.split(local)[-1] \
527                    if os.path.split(local)[-1] else os.path.split(local)[-2]
528                local_file_basename = os.path.basename(local_file_split)
529                remote = "{}/{}".format(
530                    remote, local_file_basename)
531                HdcHelper.execute_shell_command(
532                    self.device, "mkdir -p %s" % remote)
533
534            for child in os.listdir(local):
535                file_path = os.path.join(local, child)
536                if os.path.isdir(file_path):
537                    self.push_file(
538                        file_path,  "%s/%s" % (remote, child),
539                        is_create=False)
540                else:
541                    self.do_push_file(file_path, "%s/%s" % (remote, child))
542        else:
543            self.do_push_file(local, remote)
544
545    def do_push_file(self, local, remote):
546        """
547        Push a single file
548
549        Args:
550        ------------
551        local : string
552            the local file to push
553        remote : string
554            the remote file (length max is 1024)
555        """
556        mode = self.read_mode(remote)
557        self.device.log.debug("Remote file %s mode is %d" % (remote, mode))
558        self.device.log.debug("%s execute command: hdc push %s %s" % (
559            convert_serial(self.device.device_sn), local, remote))
560        if str(mode).startswith("168"):
561            remote = "%s/%s" % (remote, os.path.basename(local))
562
563        try:
564            try:
565                remote_path_content = remote.encode(DEFAULT_ENCODING)
566            except UnicodeEncodeError as _:
567                remote_path_content = remote.encode("UTF-8")
568            if len(remote_path_content) > REMOTE_PATH_MAX_LENGTH:
569                raise HdcError("Remote path is too long.")
570
571            # create the header for the action
572            # and send it. We use a custom try/catch block to make the
573            # difference between file and network IO exceptions.
574            msg = self.create_send_file_req(ID_SEND, remote_path_content,
575                                            FilePermission.mode_644)
576
577            HdcHelper.write(self.sock, msg)
578            flags = os.O_RDONLY
579            modes = stat.S_IWUSR | stat.S_IRUSR
580            with os.fdopen(os.open(local, flags, modes), "rb") as test_file:
581                while True:
582                    if platform.system() == "Linux":
583                        data = test_file.read(1024 * 4)
584                    else:
585                        data = test_file.read(SYNC_DATA_MAX)
586
587                    if not data:
588                        break
589
590                    buf = struct.pack(
591                        "%ds%ds%ds" % (len(ID_DATA), FORMAT_BYTES_LENGTH,
592                                       len(data)), ID_DATA,
593                        self.swap32bits_to_bytes(len(data)), data)
594                    self.sock.send(buf)
595        except Exception as exception:
596            self.device.log.error("exception %s" % exception)
597            raise exception
598
599        msg = self.create_req(ID_DONE, int(time.time()))
600        HdcHelper.write(self.sock, msg)
601        result = HdcHelper.read(self.sock, DATA_UNIT_LENGTH * 2)
602        if not self.check_result(result, ID_OKAY):
603            self.device.log.error("exception %s" % result)
604            raise HdcError(self.read_error_message(result))
605
606    def read_mode(self, path):
607        """
608        Returns the mode of the remote file.
609        Return an Integer containing the mode if all went well or null
610        """
611        msg = self.create_file_req(ID_STAT, path)
612        HdcHelper.write(self.sock, msg)
613
614        # read the result, in a byte array containing 4 ints
615        stat_result = HdcHelper.read(self.sock, DATA_UNIT_LENGTH * 4)
616        if not self.check_result(stat_result, ID_STAT):
617            return INVALID_MODE_CODE
618
619        return self.swap32bit_from_array(stat_result, DEFAULT_OFFSET_OF_INT)
620
621    def create_file_req(self, command, path):
622        """
623        Creates the data array for a file request. This creates an array with a
624        4 byte command + the remote file name.
625
626        Args:
627        ------------
628        command :
629            the 4 byte command (ID_STAT, ID_RECV, ...)
630        path : string
631            The path, as a byte array, of the remote file on which to execute
632            the command.
633
634        return:
635        ------------
636            return the byte[] to send to the device through hdc
637        """
638        if isinstance(path, str):
639            try:
640                path = path.encode(DEFAULT_ENCODING)
641            except UnicodeEncodeError as _:
642                path = path.encode("UTF-8")
643
644        return struct.pack(
645            "%ds%ds%ds" % (len(command), FORMAT_BYTES_LENGTH, len(path)),
646            command, self.swap32bits_to_bytes(len(path)), path)
647
648    def create_send_file_req(self, command, path, mode=0o644):
649        # make the mode into a string
650        mode_str = ",%s" % str(mode & FilePermission.mode_777)
651        mode_content = mode_str.encode(DEFAULT_ENCODING)
652        return struct.pack(
653            "%ds%ds%ds%ds" % (len(command), FORMAT_BYTES_LENGTH, len(path),
654                              len(mode_content)),
655            command, self.swap32bits_to_bytes(len(path) + len(mode_content)),
656            path, mode_content)
657
658    def create_req(self, command, value):
659        """
660        Create a command with a code and an int values
661        """
662        return struct.pack("%ds%ds" % (len(command), FORMAT_BYTES_LENGTH),
663                           command, self.swap32bits_to_bytes(value))
664
665    @staticmethod
666    def check_result(result, code):
667        """
668        Checks the result array starts with the provided code
669
670        Args:
671        ------------
672        result :
673            the result array to check
674        path : string
675            the 4 byte code
676
677        return:
678        ------------
679        bool
680            return true if the code matches
681        """
682        return result[0:4] == code[0:4]
683
684    def read_error_message(self, result):
685        """
686        Reads an error message from the opened Socket.
687
688        Args:
689        ------------
690        result :
691            the current hdc result. Must contain both FAIL and the length of
692            the message.
693        """
694        if self.check_result(result, ID_FAIL):
695            length = self.swap32bit_from_array(result, 4)
696            if length > 0:
697                return str(HdcHelper.read(self.sock, length))
698
699        return None
700
701    @staticmethod
702    def swap32bits_to_bytes(value):
703        """
704        Swaps an unsigned value around, and puts the result in an bytes that
705        can be sent to a device.
706
707        Args:
708        ------------
709        value :
710            the value to swap.
711        """
712        return bytes([value & 0x000000FF,
713                      (value & 0x0000FF00) >> 8,
714                      (value & 0x00FF0000) >> 16,
715                      (value & 0xFF000000) >> 24])
716
717    @staticmethod
718    def swap32bit_from_array(value, offset):
719        """
720        Reads a signed 32 bit integer from an array coming from a device.
721
722        Args:
723        ------------
724        value :
725            the array containing the int
726        offset:
727            the offset in the array at which the int starts
728
729        Return:
730        ------------
731        int
732            the integer read from the array
733        """
734        result = 0
735        result |= (int(value[offset])) & 0x000000FF
736        result |= (int(value[offset + 1]) & 0x000000FF) << 8
737        result |= (int(value[offset + 2]) & 0x000000FF) << 16
738        result |= (int(value[offset + 3]) & 0x000000FF) << 24
739
740        return result
741
742
743class HdcHelper:
744    CONNECTOR_NAME = ""
745
746    @staticmethod
747    def check_if_hdc_running(timeout=30):
748        LOG.debug("Check if {} is running, timeout is {}s".format(
749            HdcHelper.CONNECTOR_NAME, timeout))
750        index = 1
751        while index < timeout:
752            if is_proc_running(HdcHelper.CONNECTOR_NAME):
753                return True
754            index = index + 1
755            time.sleep(1)
756        return False
757
758    @staticmethod
759    def push_file(device, local, remote, is_create=False,
760                  timeout=DEFAULT_TIMEOUT):
761        device.log.info("{} execute command: {} file send {} to {}".format(
762            convert_serial(device.device_sn), HdcHelper.CONNECTOR_NAME, local, remote))
763        HdcHelper._operator_file("file send", device, local, remote, timeout)
764
765    @staticmethod
766    def pull_file(device, remote, local, is_create=False,
767                  timeout=DEFAULT_TIMEOUT):
768        device.log.info("{} execute command: {} file recv {} to {}".format(
769            convert_serial(device.device_sn), HdcHelper.CONNECTOR_NAME, remote, local))
770        HdcHelper._operator_file("file recv", device, remote, local, timeout)
771
772    @staticmethod
773    def _install_remote_package(device, remote_file_path, command):
774        receiver = CollectingOutputReceiver()
775        cmd = "bm install -p %s %s" % (command.strip(), remote_file_path)
776        HdcHelper.execute_shell_command(device, cmd, INSTALL_TIMEOUT, receiver)
777        return receiver.output
778
779    @staticmethod
780    def install_package(device, package_file_path, command):
781        device.log.info("%s install %s" % (convert_serial(device.device_sn),
782                                           package_file_path))
783        remote_file_path = "/data/local/tmp/%s" % os.path.basename(
784            package_file_path)
785
786        HdcHelper.push_file(device, package_file_path, remote_file_path)
787
788        result = HdcHelper._install_remote_package(device, remote_file_path,
789                                                   command)
790        HdcHelper.execute_shell_command(device, "rm %s " % remote_file_path)
791        return result
792
793    @staticmethod
794    def uninstall_package(device, package_name):
795        receiver = CollectingOutputReceiver()
796        command = "bm uninstall -n %s " % package_name
797        device.log.info("%s %s" % (convert_serial(device.device_sn), command))
798        HdcHelper.execute_shell_command(device, command, INSTALL_TIMEOUT,
799                                        receiver)
800        return receiver.output
801
802    @staticmethod
803    def reboot(device, into=None):
804        device.log.info("{} execute command: {} target boot".format(convert_serial(device.device_sn),
805                                                                    HdcHelper.CONNECTOR_NAME))
806        with HdcHelper.socket(host=device.host, port=device.port) as sock:
807            HdcHelper.handle_shake(sock, device.device_sn)
808            request = HdcHelper.form_hdc_request("target boot")
809            HdcHelper.write(sock, request)
810
811    @staticmethod
812    def execute_shell_command(device, command, timeout=DEFAULT_TIMEOUT,
813                              receiver=None, **kwargs):
814        """
815        Executes a shell command on the device and retrieve the output.
816
817        Args:
818        ------------
819        device : IDevice
820            on which to execute the command.
821        command : string
822            the shell command to execute
823        timeout : int
824            max time between command output. If more time passes between
825            command output, the method will throw
826            ShellCommandUnresponsiveException (ms).
827        """
828        try:
829            if not timeout:
830                timeout = DEFAULT_TIMEOUT
831
832            with HdcHelper.socket(host=device.host, port=device.port,
833                                  timeout=timeout) as sock:
834                output_flag = kwargs.get("output_flag", True)
835                timeout_msg = " with timeout %ss" % str(timeout / 1000)
836                message = "{} execute command: {} shell {}{}".format(convert_serial(device.device_sn),
837                                                                     HdcHelper.CONNECTOR_NAME,
838                                                                     command, timeout_msg)
839                if output_flag:
840                    LOG.info(message)
841                else:
842                    LOG.debug(message)
843                from xdevice import Scheduler
844                HdcHelper.handle_shake(sock, device.device_sn)
845                request = HdcHelper.form_hdc_request("shell {}".format(command))
846                HdcHelper.write(sock, request)
847                resp = HdcResponse()
848                resp.okay = True
849                while True:
850                    len_buf = sock.recv(DATA_UNIT_LENGTH)
851                    if len_buf:
852                        length = struct.unpack("!I", len_buf)[0]
853                    else:
854                        break
855                    data = sock.recv(length)
856                    ret = HdcHelper.reply_to_string(data)
857                    if ret:
858                        if receiver:
859                            receiver.__read__(ret)
860                        else:
861                            LOG.debug(ret)
862                    if not Scheduler.is_execute:
863                        raise ExecuteTerminate()
864                return resp
865        except socket.timeout as error:
866            device.log.error("ShellCommandUnresponsiveException: {} shell {} timeout[{}S]".format(
867                convert_serial(device.device_sn), command, str(timeout / 1000)))
868            raise ShellCommandUnresponsiveException() from error
869        finally:
870            if receiver:
871                receiver.__done__()
872
873    @staticmethod
874    def set_device(device, sock):
875        """
876        Tells hdc to talk to a specific device
877        if the device is not -1, then we first tell hdc we're looking to talk
878        to a specific device
879        """
880        msg = "host:transport:%s" % device.device_sn
881        device_query = HdcHelper.form_hdc_request(msg)
882        HdcHelper.write(sock, device_query)
883        resp = HdcHelper.read_hdc_response(sock)
884        if not resp.okay:
885            raise HdcCommandRejectedException(resp.message)
886
887    @staticmethod
888    def form_hdc_request(req):
889        """
890        Create an ASCII string preceded by four hex digits.
891        """
892        try:
893            if not req.endswith('\0'):
894                req = "%s\0" % req
895            req = req.encode("utf-8")
896            fmt = "!I%ss" % len(req)
897            result = struct.pack(fmt, len(req), req)
898        except UnicodeEncodeError as ex:
899            LOG.error(ex)
900            raise ex
901        return result
902
903    @staticmethod
904    def read_hdc_response(sock, read_diag_string=False):
905        """
906        Reads the response from HDC after a command.
907
908        Args:
909        ------------
910        read_diag_string :
911            If true, we're expecting an OKAY response to be followed by a
912            diagnostic string. Otherwise, we only expect the diagnostic string
913            to follow a FAIL.
914        """
915        resp = HdcResponse()
916        reply = HdcHelper.read(sock, DATA_UNIT_LENGTH)
917        if HdcHelper.is_okay(reply):
918            resp.okay = True
919        else:
920            read_diag_string = True
921            resp.okay = False
922
923        while read_diag_string:
924            len_buf = HdcHelper.read(sock, DATA_UNIT_LENGTH)
925            len_str = HdcHelper.reply_to_string(len_buf)
926            msg = HdcHelper.read(sock, int(len_str, HEXADECIMAL_NUMBER))
927            resp.message = HdcHelper.reply_to_string(msg)
928            break
929
930        return resp
931
932    @staticmethod
933    def write(sock, req, timeout=10):
934        if isinstance(req, str):
935            req = req.encode(DEFAULT_ENCODING)
936        elif isinstance(req, list):
937            req = bytes(req)
938
939        start_time = time.time()
940        while req:
941            if time.time() - start_time > timeout:
942                LOG.debug("Socket write timeout, timeout:%ss" % timeout)
943                break
944
945            size = sock.send(req)
946            if size < 0:
947                raise DeviceError("channel EOF")
948
949            req = req[size:]
950            time.sleep(5 / 1000)
951
952    @staticmethod
953    def read(sock, length, timeout=10):
954        data = b''
955        recv_len = 0
956        start_time = time.time()
957        exc_num = 3
958        while length - recv_len > 0:
959            if time.time() - start_time > timeout:
960                LOG.debug("Socket read timeout, timout:%ss" % timeout)
961                break
962            try:
963                recv = sock.recv(length - recv_len)
964                if len(recv) > 0:
965                    time.sleep(5 / 1000)
966                else:
967                    break
968            except ConnectionResetError as error:
969                if exc_num <= 0:
970                    raise error
971                exc_num = exc_num - 1
972                recv = b''
973                time.sleep(1)
974                LOG.debug("ConnectionResetError occurs")
975
976            data += recv
977            recv_len += len(recv)
978
979        return data
980
981    @staticmethod
982    def is_okay(reply):
983        """
984        Checks to see if the first four bytes in "reply" are OKAY.
985        """
986        return reply[0:4] == ID_OKAY
987
988    @staticmethod
989    def reply_to_string(reply):
990        """
991        Converts an HDC reply to a string.
992        """
993        try:
994            return str(reply, encoding=DEFAULT_ENCODING)
995        except (ValueError, TypeError) as _:
996            return ""
997
998    @staticmethod
999    def socket(host=None, port=None, timeout=None):
1000        end = time.time() + 10 * 60
1001        sock = None
1002        hdc_connection = HdcMonitor.MONITOR_MAP.get(host, "127.0.0.1")
1003        while host not in HdcMonitor.MONITOR_MAP or \
1004                hdc_connection.main_hdc_connection is None:
1005            LOG.debug("Host: %s, port: %s, HdcMonitor map is %s" % (
1006                host, port, HdcMonitor.MONITOR_MAP))
1007            if host in HdcMonitor.MONITOR_MAP:
1008                LOG.debug("Monitor main hdc connection is %s" %
1009                          hdc_connection.main_hdc_connection)
1010            if time.time() > end:
1011                raise HdcError("Cannot detect HDC monitor!")
1012            time.sleep(2)
1013
1014        try:
1015            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1016            sock.connect((host, int(port)))
1017        except socket.error as exception:
1018            LOG.exception("Connect hdc server error: %s" % str(exception),
1019                          exc_info=False)
1020            raise exception
1021
1022        if sock is None:
1023            raise HdcError("Cannot connect hdc server!")
1024
1025        if timeout is not None:
1026            sock.setblocking(False)
1027            sock.settimeout(timeout / 1000)
1028
1029        return sock
1030
1031    @staticmethod
1032    def handle_shake(connection, connect_key=""):
1033        reply = HdcHelper.read(connection, 48)
1034        struct.unpack(">I12s32s", reply)
1035        banner_str = b'OHOS HDC'
1036        connect_key = connect_key.encode("utf-8")
1037        size = struct.calcsize('12s256s')
1038        fmt = "!I12s256s"
1039        pack_cmd = struct.pack(fmt, size, banner_str, connect_key)
1040        HdcHelper.write(connection, pack_cmd)
1041        return True
1042
1043    @staticmethod
1044    def _operator_file(command, device, local, remote, timeout):
1045        sock = HdcHelper.socket(host=device.host, port=device.port,
1046                                timeout=timeout)
1047        HdcHelper.handle_shake(sock, device.device_sn)
1048        request = HdcHelper.form_hdc_request(
1049            "%s %s %s" % (command, local, remote))
1050        HdcHelper.write(sock, request)
1051        reply = HdcHelper.read(sock, DATA_UNIT_LENGTH)
1052        length = struct.unpack("!I", reply)[0]
1053        data_buf = HdcHelper.read(sock, length)
1054        HdcHelper.reply_to_string(data_buf)
1055        LOG.debug("result %s" % data_buf)
1056
1057    @staticmethod
1058    def is_hdc_std():
1059        return HDC_STD_NAME in HdcHelper.CONNECTOR_NAME
1060
1061
1062class DeviceConnector(object):
1063    __instance = None
1064    __init_flag = False
1065
1066    def __init__(self, host=None, port=None, usb_type=None):
1067        if DeviceConnector.__init_flag:
1068            return
1069        self.device_listeners = []
1070        self.device_monitor = None
1071        self.monitor_lock = threading.Condition()
1072        self.host = host if host else "127.0.0.1"
1073        self.usb_type = usb_type
1074        connector_name = HdcMonitor.peek_hdc()
1075        HdcHelper.CONNECTOR_NAME = connector_name
1076        if port:
1077            self.port = int(port)
1078        else:
1079            self.port = int(os.getenv("OHOS_HDC_SERVER_PORT", DEFAULT_STD_PORT))
1080
1081    def start(self):
1082        self.device_monitor = HdcMonitor.get_instance(
1083            self.host, self.port, device_connector=self)
1084        self.device_monitor.start()
1085
1086    def terminate(self):
1087        if self.device_monitor:
1088            self.device_monitor.stop()
1089        self.device_monitor = None
1090
1091    def add_device_change_listener(self, device_change_listener):
1092        self.device_listeners.append(device_change_listener)
1093
1094    def remove_device_change_listener(self, device_change_listener):
1095        if device_change_listener in self.device_listeners:
1096            self.device_listeners.remove(device_change_listener)
1097
1098    def device_connected(self, device):
1099        LOG.debug("DeviceConnector device connected:host %s, port %s, "
1100                  "device sn %s " % (self.host, self.port, device.device_sn))
1101        if device.host != self.host or device.port != self.port:
1102            LOG.debug("DeviceConnector device error")
1103        for listener in self.device_listeners:
1104            listener.device_connected(device)
1105
1106    def device_disconnected(self, device):
1107        LOG.debug("DeviceConnector device disconnected:host %s, port %s, "
1108                  "device sn %s" % (self.host, self.port, device.device_sn))
1109        if device.host != self.host or device.port != self.port:
1110            LOG.debug("DeviceConnector device error")
1111        for listener in self.device_listeners:
1112            listener.device_disconnected(device)
1113
1114    def device_changed(self, device):
1115        LOG.debug("DeviceConnector device changed:host %s, port %s, "
1116                  "device sn %s" % (self.host, self.port, device.device_sn))
1117        if device.host != self.host or device.port != self.port:
1118            LOG.debug("DeviceConnector device error")
1119        for listener in self.device_listeners:
1120            listener.device_changed(device)
1121
1122
1123class CollectingOutputReceiver(IShellReceiver):
1124    def __init__(self):
1125        self.output = ""
1126
1127    def __read__(self, output):
1128        self.output = "%s%s" % (self.output, output)
1129
1130    def __error__(self, message):
1131        pass
1132
1133    def __done__(self, result_code="", message=""):
1134        pass
1135
1136
1137class DisplayOutputReceiver(IShellReceiver):
1138    def __init__(self):
1139        self.output = ""
1140        self.unfinished_line = ""
1141
1142    def _process_output(self, output, end_mark="\n"):
1143        content = output
1144        if self.unfinished_line:
1145            content = "".join((self.unfinished_line, content))
1146            self.unfinished_line = ""
1147        lines = content.split(end_mark)
1148        if content.endswith(end_mark):
1149            # get rid of the tail element of this list contains empty str
1150            return lines[:-1]
1151        else:
1152            self.unfinished_line = lines[-1]
1153            # not return the tail element of this list contains unfinished str,
1154            # so we set position -1
1155            return lines[:-1]
1156
1157    def __read__(self, output):
1158        self.output = "%s%s" % (self.output, output)
1159        lines = self._process_output(output)
1160        for line in lines:
1161            line = line.strip()
1162            if line:
1163                LOG.info(line)
1164
1165    def __error__(self, message):
1166        pass
1167
1168    def __done__(self, result_code="", message=""):
1169        pass
1170
1171
1172def process_command_ret(ret, receiver):
1173    try:
1174        if ret != "" and receiver:
1175            receiver.__read__(ret)
1176            receiver.__done__()
1177    except Exception as error:
1178        LOG.exception("Error generating log report.", exc_info=False)
1179        raise ReportException() from error
1180
1181    if ret != "" and not receiver:
1182        lines = ret.split("\n")
1183        for line in lines:
1184            line = line.strip()
1185            if line:
1186                LOG.debug(line)
1187