• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import platform
21import socket
22import struct
23import threading
24import time
25import shutil
26import stat
27from dataclasses import dataclass
28
29from xdevice import DeviceOsType
30from xdevice import ReportException
31from xdevice import ExecuteTerminate
32from xdevice import platform_logger
33from xdevice import Plugin
34from xdevice import get_plugin
35from xdevice import IShellReceiver
36from xdevice import exec_cmd
37from xdevice import get_file_absolute_path
38from xdevice import FilePermission
39from xdevice import DeviceError
40from xdevice import HdcError
41from xdevice import HdcCommandRejectedException
42from xdevice import ShellCommandUnresponsiveException
43from xdevice import DeviceState
44from xdevice import convert_serial
45from xdevice import is_proc_running
46from xdevice import convert_ip
47from xdevice import create_dir
48
49ID_OKAY = b'OKAY'
50ID_FAIL = b'FAIL'
51ID_STAT = b'STAT'
52ID_RECV = b'RECV'
53ID_DATA = b'DATA'
54ID_DONE = b'DONE'
55ID_SEND = b'SEND'
56ID_LIST = b'LIST'
57ID_DENT = b'DENT'
58
59DEFAULT_ENCODING = "ISO-8859-1"
60SYNC_DATA_MAX = 64 * 1024
61REMOTE_PATH_MAX_LENGTH = 1024
62SOCK_DATA_MAX = 256
63
64INSTALL_TIMEOUT = 2 * 60 * 1000
65DEFAULT_TIMEOUT = 40 * 1000
66
67MAX_CONNECT_ATTEMPT_COUNT = 10
68DATA_UNIT_LENGTH = 4
69HEXADECIMAL_NUMBER = 16
70SPECIAL_FILE_MODE = 41471
71FORMAT_BYTES_LENGTH = 4
72DEFAULT_OFFSET_OF_INT = 4
73
74INVALID_MODE_CODE = -1
75DEFAULT_STD_PORT = 8710
76HDC_NAME = "hdc"
77HDC_STD_NAME = "hdc_std"
78LOG = platform_logger("Hdc")
79
80
81class HdcMonitor:
82    """
83    A Device monitor.
84    This monitor connects to the Device Connector, gets device and
85    debuggable process information from it.
86    """
87    MONITOR_MAP = {}
88
89    def __init__(self, host="127.0.0.1", port=None, device_connector=None):
90        self.channel = dict()
91        self.channel.setdefault("host", host)
92        self.channel.setdefault("port", port)
93        self.main_hdc_connection = None
94        self.connection_attempt = 0
95        self.is_stop = False
96        self.monitoring = False
97        self.server = device_connector
98        self.devices = []
99
100    @staticmethod
101    def get_instance(host, port=None, device_connector=None):
102        if host not in HdcMonitor.MONITOR_MAP:
103            monitor = HdcMonitor(host, port, device_connector)
104            HdcMonitor.MONITOR_MAP[host] = monitor
105            LOG.debug("HdcMonitor map add host %s, map is %s" %
106                      (host, HdcMonitor.MONITOR_MAP))
107
108        return HdcMonitor.MONITOR_MAP[host]
109
110    def start(self):
111        """
112        Starts the monitoring.
113        """
114        try:
115            connector_name = HDC_STD_NAME if HdcHelper.is_hdc_std() else HDC_NAME
116            self.init_hdc(connector_name)
117            server_thread = threading.Thread(target=self.loop_monitor,
118                                             name="HdcMonitor", args=())
119            server_thread.setDaemon(True)
120            server_thread.start()
121        except FileNotFoundError as _:
122            LOG.error("HdcMonitor can't find connector, init device "
123                      "environment failed!")
124
125    def init_hdc(self, connector_name=HDC_NAME):
126        env_hdc = shutil.which(connector_name)
127        # if not, add xdevice's own hdc path to environ path.
128        # tell if hdc has already been in the environ path.
129        if env_hdc is None:
130            LOG.error("Can not find {} or {} environment variable, "
131                      "please set it first!".format(HDC_NAME, HDC_STD_NAME))
132        if not is_proc_running(connector_name):
133            port = DEFAULT_STD_PORT
134            self.start_hdc(
135                connector=connector_name,
136                local_port=self.channel.setdefault(
137                    "port", port))
138            time.sleep(1)
139
140    def stop(self):
141        """
142        Stops the monitoring.
143        """
144        for host in HdcMonitor.MONITOR_MAP:
145            LOG.debug("HdcMonitor stop host %s" % host)
146            monitor = HdcMonitor.MONITOR_MAP[host]
147            try:
148                monitor.is_stop = True
149                if monitor.main_hdc_connection is not None:
150                    monitor.main_hdc_connection.shutdown(2)
151                    monitor.main_hdc_connection.close()
152                    monitor.main_hdc_connection = None
153            except (socket.error, socket.gaierror, socket.timeout) as _:
154                LOG.error("HdcMonitor close socket exception")
155        HdcMonitor.MONITOR_MAP.clear()
156        LOG.debug("HdcMonitor hdc monitor stop!")
157        LOG.debug("HdcMonitor map is %s" % HdcMonitor.MONITOR_MAP)
158
159    def loop_monitor(self):
160        """
161        Monitors the devices. This connects to the Debug Bridge
162        """
163        LOG.debug("current connector name is %s" % HdcHelper.CONNECTOR_NAME)
164        while not self.is_stop:
165            try:
166                if self.main_hdc_connection is None:
167                    self.main_hdc_connection = self.open_hdc_connection()
168                    if self.main_hdc_connection is None:
169                        self.connection_attempt += 1
170
171                        if self.connection_attempt > MAX_CONNECT_ATTEMPT_COUNT:
172                            self.is_stop = True
173                            LOG.error(
174                                "HdcMonitor attempt %s, can't connect to hdc "
175                                "for Device List Monitoring" %
176                                str(self.connection_attempt))
177                            raise HdcError(
178                                "HdcMonitor cannot connect hdc server(%s %s),"
179                                " please check!" %
180                                (self.channel.get("host"),
181                                 str(self.channel.get("post"))))
182
183                        LOG.debug(
184                            "HdcMonitor Connection attempts: %s" %
185                            str(self.connection_attempt))
186
187                        time.sleep(2)
188                    else:
189                        LOG.debug(
190                            "HdcMonitor Connected to hdc for device "
191                            "monitoring, main_hdc_connection is %s" %
192                            self.main_hdc_connection)
193
194                self.list_targets()
195            except (HdcError, Exception) as _:
196                self.handle_exception_monitor_loop()
197                break
198
199    def handle_exception_monitor_loop(self):
200        LOG.debug("Handle exception monitor loop: %s" %
201                  self.main_hdc_connection)
202        if self.main_hdc_connection is None:
203            return
204        self.main_hdc_connection.close()
205        LOG.debug("Handle exception monitor loop, main hdc connection closed, "
206                  "main hdc connection: %s" % self.main_hdc_connection)
207
208    def device_list_monitoring(self):
209        request = HdcHelper.form_hdc_request("host:track-devices")
210        HdcHelper.write(self.main_hdc_connection, request)
211        resp = HdcHelper.read_hdc_response(self.main_hdc_connection)
212        if not resp.okay:
213            LOG.error("HdcMonitor hdc rejected shell command")
214            raise Exception(resp.message)
215        else:
216            LOG.debug(
217                'HdcMonitor execute command success:send device_list '
218                'monitoring request')
219        return True
220
221    def _get_device_instance(self, items, os_type):
222        device = get_plugin(plugin_type=Plugin.DEVICE, plugin_id=os_type)[0]
223        device_instance = device.__class__()
224        device_instance.__set_serial__(items[0])
225        device_instance.host = self.channel.get("host")
226        device_instance.port = self.channel.get("port")
227        LOG.debug("Dmlib get device instance %s %s %s" %
228                  (device_instance.device_sn,
229                   device_instance.host, device_instance.port))
230        device_instance.device_state = DeviceState.get_state(items[1])
231        return device_instance
232
233    def update_devices(self, param_array_list):
234        devices = [item for item in self.devices]
235        devices.reverse()
236        for local_device1 in devices:
237            k = 0
238            for local_device2 in param_array_list:
239                if local_device1.device_sn == local_device2.device_sn and \
240                        local_device1.device_os_type == \
241                        local_device2.device_os_type:
242                    k = 1
243                    if local_device1.device_state != \
244                            local_device2.device_state:
245                        local_device1.device_state = local_device2.device_state
246                        self.server.device_changed(local_device1)
247                        param_array_list.remove(local_device2)
248                        break
249
250            if k == 0:
251                self.devices.remove(local_device1)
252                self.server.device_disconnected(local_device1)
253        for local_device in param_array_list:
254            self.devices.append(local_device)
255            self.server.device_connected(local_device)
256
257    def open_hdc_connection(self):
258        """
259        Attempts to connect to the debug bridge server. Return a connect socket
260        if success, null otherwise.
261        """
262        sock = None
263        try:
264            LOG.debug(
265                "HdcMonitor connecting to hdc for Device List Monitoring")
266            LOG.debug("HdcMonitor socket connection host: %s, port: %s" %
267                      (str(convert_ip(self.channel.get("host"))),
268                       str(int(self.channel.get("port")))))
269
270            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
271            sock.connect((self.channel.get("host"),
272                          int(self.channel.get("port"))))
273            return sock
274
275        except (socket.error, socket.gaierror, socket.timeout) as exception:
276            LOG.error("HdcMonitor hdc socket connection Error: %s, "
277                      "host is %s, port is %s" % (str(exception),
278                                                  self.channel.get("host"),
279                                                  self.channel.get("port")))
280            return sock
281
282    @classmethod
283    def start_hdc(cls, connector=HDC_NAME, kill=False, local_port=None):
284        """Starts the hdc host side server.
285
286        Args:
287            connector: connector type, like "hdc"
288            kill: if True, kill exist host side server
289            local_port: local port to start host side server
290
291        Returns:
292            None
293        """
294        if kill:
295            LOG.debug("HdcMonitor {} kill".format(connector))
296            exec_cmd([connector, "kill"])
297        LOG.debug("HdcMonitor {} start".format(connector))
298        exec_cmd(
299            [connector, "-l5", "start"],
300            error_print=False, redirect=True)
301
302    def list_targets(self):
303        if self.main_hdc_connection and not self.monitoring:
304            self.monitoring_list_targets()
305            len_buf = HdcHelper.read(self.main_hdc_connection,
306                                     DATA_UNIT_LENGTH)
307            length = struct.unpack("!I", len_buf)[0]
308            LOG.debug("had received length is: %s" % length)
309            if length >= 8:
310                self.connection_attempt = 0
311                self.monitoring = True
312                self.process_incoming_target_data(length)
313
314    def monitoring_list_targets(self):
315        HdcHelper.handle_shake(self.main_hdc_connection)
316        request = HdcHelper.form_hdc_request('list targets')
317        HdcHelper.write(self.main_hdc_connection, request)
318
319    def process_incoming_target_data(self, length):
320        local_array_list = []
321        data_buf = HdcHelper.read(self.main_hdc_connection, length)
322        data_str = HdcHelper.reply_to_string(data_buf)
323        if 'Empty' not in data_str:
324            lines = data_str.split('\n')
325            for line in lines:
326                items = line.strip().split('\t')
327                if not items[0]:
328                    continue
329                items.append(DeviceState.ONLINE.value)
330                device_instance = self._get_device_instance(
331                    items, DeviceOsType.default)
332                local_array_list.append(device_instance)
333        else:
334            LOG.debug("please check device actually.[%s]" % data_str.strip())
335        self.update_devices(local_array_list)
336
337    @staticmethod
338    def peek_hdc():
339        LOG.debug("Peek running process to check expect connector.")
340        # if not find hdc_std, try find hdc
341        connector_name = HDC_STD_NAME
342        env_hdc = shutil.which(connector_name)
343        # if not, add xdevice's own hdc path to environ path.
344        # tell if hdc has already been in the environ path.
345        if env_hdc is None:
346            connector_name = HDC_NAME
347        LOG.debug("Peak end")
348        return connector_name
349
350
351@dataclass
352class HdcResponse:
353    """Response from HDC."""
354    okay = ID_OKAY  # first 4 bytes in response were "OKAY"?
355    message = ""  # diagnostic string if okay is false
356
357
358class SyncService:
359    """
360    Sync service class to push/pull to/from devices/emulators,
361    through the debug bridge.
362    """
363
364    def __init__(self, device, host=None, port=None):
365        self.device = device
366        self.host = host
367        self.port = port
368        self.sock = None
369
370    def open_sync(self, timeout=DEFAULT_TIMEOUT):
371        """
372        Opens the sync connection. This must be called before any calls to
373        push[File] / pull[File].
374        Return true if the connection opened, false if hdc refuse the
375        connection. This can happen device is invalid.
376        """
377        LOG.debug("Open sync, timeout=%s" % int(timeout / 1000))
378        self.sock = HdcHelper.socket(host=self.host, port=self.port,
379                                     timeout=timeout)
380        HdcHelper.set_device(self.device, self.sock)
381
382        request = HdcHelper.form_hdc_request("sync:")
383        HdcHelper.write(self.sock, request)
384
385        resp = HdcHelper.read_hdc_response(self.sock)
386        if not resp.okay:
387            self.device.log.error(
388                "Got unhappy response from HDC sync req: %s" % resp.message)
389            raise HdcError(
390                "Got unhappy response from HDC sync req: %s" % resp.message)
391
392    def close(self):
393        """
394        Closes the connection.
395        """
396        if self.sock is not None:
397            try:
398                self.sock.close()
399            except socket.error as error:
400                LOG.error("Socket close error: %s" % error, error_no="00420")
401            finally:
402                self.sock = None
403
404    def pull_file(self, remote, local, is_create=False):
405        """
406        Pulls a file.
407        The top directory won't be created if is_create is False (by default)
408        and vice versa
409        """
410        mode = self.read_mode(remote)
411        self.device.log.debug("Remote file %s mode is %d" % (remote, mode))
412        if mode == 0:
413            raise HdcError("Remote object doesn't exist!")
414
415        if str(mode).startswith("168"):
416            if is_create:
417                remote_file_split = os.path.split(remote)[-1] \
418                    if os.path.split(remote)[-1] else os.path.split(remote)[-2]
419                remote_file_basename = os.path.basename(remote_file_split)
420                new_local = os.path.join(local, remote_file_basename)
421                create_dir(new_local)
422            else:
423                new_local = local
424
425            collect_receiver = CollectingOutputReceiver()
426            HdcHelper.execute_shell_command(self.device, "ls %s" % remote,
427                                            receiver=collect_receiver)
428            files = collect_receiver.output.split()
429            for file_name in files:
430                self.pull_file("%s/%s" % (remote, file_name),
431                               new_local, is_create=True)
432        elif mode == SPECIAL_FILE_MODE:
433            self.device.log.info("skipping special file '%s'" % remote)
434        else:
435            if os.path.isdir(local):
436                local = os.path.join(local, os.path.basename(remote))
437
438            self.do_pull_file(remote, local)
439
440    def do_pull_file(self, remote, local):
441        """
442        Pulls a remote file
443        """
444        self.device.log.info(
445            "%s pull %s to %s" % (convert_serial(self.device.device_sn),
446                                  remote, local))
447        remote_path_content = remote.encode(DEFAULT_ENCODING)
448        if len(remote_path_content) > REMOTE_PATH_MAX_LENGTH:
449            raise HdcError("Remote path is too long.")
450
451        msg = self.create_file_req(ID_RECV, remote_path_content)
452        HdcHelper.write(self.sock, msg)
453        pull_result = HdcHelper.read(self.sock, DATA_UNIT_LENGTH * 2)
454        if not self.check_result(pull_result, ID_DATA) and \
455                not self.check_result(pull_result, ID_DONE):
456            raise HdcError(self.read_error_message(pull_result))
457        if platform.system() == "Windows":
458            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY
459        else:
460            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND
461        pulled_file_open = os.open(local, flags, FilePermission.mode_755)
462        with os.fdopen(pulled_file_open, "wb") as pulled_file:
463            while True:
464                if self.check_result(pull_result, ID_DONE):
465                    break
466
467                if not self.check_result(pull_result, ID_DATA):
468                    raise HdcError(self.read_error_message(pull_result))
469
470                try:
471                    length = self.swap32bit_from_array(
472                        pull_result, DEFAULT_OFFSET_OF_INT)
473                except IndexError as index_error:
474                    self.device.log.debug("do_pull_file: %s" %
475                                          str(pull_result))
476                    if pull_result == ID_DATA:
477                        pull_result = self.sock.recv(DATA_UNIT_LENGTH)
478                        self.device.log.debug(
479                            "do_pull_file: %s" % str(pull_result))
480                        length = self.swap32bit_from_array(pull_result, 0)
481                        self.device.log.debug("do_pull_file: %s" % str(length))
482                    else:
483                        raise IndexError(str(index_error))
484
485                if length > SYNC_DATA_MAX:
486                    raise HdcError("Receiving too much data.")
487
488                pulled_file.write(HdcHelper.read(self.sock, length))
489                pulled_file.flush()
490                pull_result = self.sock.recv(DATA_UNIT_LENGTH * 2)
491
492    def push_file(self, local, remote, is_create=False):
493        """
494        Push a single file.
495        The top directory won't be created if is_create is False (by default)
496        and vice versa
497        """
498        if not os.path.exists(local):
499            raise HdcError("Local path doesn't exist.")
500
501        if os.path.isdir(local):
502            if is_create:
503                local_file_split = os.path.split(local)[-1] \
504                    if os.path.split(local)[-1] else os.path.split(local)[-2]
505                local_file_basename = os.path.basename(local_file_split)
506                remote = "{}/{}".format(
507                    remote, local_file_basename)
508                HdcHelper.execute_shell_command(
509                    self.device, "mkdir -p %s" % remote)
510
511            for child in os.listdir(local):
512                file_path = os.path.join(local, child)
513                if os.path.isdir(file_path):
514                    self.push_file(
515                        file_path, "%s/%s" % (remote, child),
516                        is_create=False)
517                else:
518                    self.do_push_file(file_path, "%s/%s" % (remote, child))
519        else:
520            self.do_push_file(local, remote)
521
522    def do_push_file(self, local, remote):
523        """
524        Push a single file
525
526        Args:
527        ------------
528        local : string
529            the local file to push
530        remote : string
531            the remote file (length max is 1024)
532        """
533        mode = self.read_mode(remote)
534        self.device.log.debug("Remote file %s mode is %d" % (remote, mode))
535        self.device.log.debug("%s execute command: hdc push %s %s" % (
536            convert_serial(self.device.device_sn), local, remote))
537        if str(mode).startswith("168"):
538            remote = "%s/%s" % (remote, os.path.basename(local))
539
540        try:
541            try:
542                remote_path_content = remote.encode(DEFAULT_ENCODING)
543            except UnicodeEncodeError as _:
544                remote_path_content = remote.encode("UTF-8")
545            if len(remote_path_content) > REMOTE_PATH_MAX_LENGTH:
546                raise HdcError("Remote path is too long.")
547
548            # create the header for the action
549            # and send it. We use a custom try/catch block to make the
550            # difference between file and network IO exceptions.
551            msg = self.create_send_file_req(ID_SEND, remote_path_content,
552                                            FilePermission.mode_644)
553
554            HdcHelper.write(self.sock, msg)
555            flags = os.O_RDONLY
556            modes = stat.S_IWUSR | stat.S_IRUSR
557            with os.fdopen(os.open(local, flags, modes), "rb") as test_file:
558                while True:
559                    if platform.system() == "Linux":
560                        data = test_file.read(1024 * 4)
561                    else:
562                        data = test_file.read(SYNC_DATA_MAX)
563
564                    if not data:
565                        break
566
567                    buf = struct.pack(
568                        "%ds%ds%ds" % (len(ID_DATA), FORMAT_BYTES_LENGTH,
569                                       len(data)), ID_DATA,
570                        self.swap32bits_to_bytes(len(data)), data)
571                    self.sock.send(buf)
572        except Exception as exception:
573            self.device.log.error("exception %s" % exception)
574            raise exception
575
576        msg = self.create_req(ID_DONE, int(time.time()))
577        HdcHelper.write(self.sock, msg)
578        result = HdcHelper.read(self.sock, DATA_UNIT_LENGTH * 2)
579        if not self.check_result(result, ID_OKAY):
580            self.device.log.error("exception %s" % result)
581            raise HdcError(self.read_error_message(result))
582
583    def read_mode(self, path):
584        """
585        Returns the mode of the remote file.
586        Return an Integer containing the mode if all went well or null
587        """
588        msg = self.create_file_req(ID_STAT, path)
589        HdcHelper.write(self.sock, msg)
590
591        # read the result, in a byte array containing 4 ints
592        stat_result = HdcHelper.read(self.sock, DATA_UNIT_LENGTH * 4)
593        if not self.check_result(stat_result, ID_STAT):
594            return INVALID_MODE_CODE
595
596        return self.swap32bit_from_array(stat_result, DEFAULT_OFFSET_OF_INT)
597
598    def create_file_req(self, command, path):
599        """
600        Creates the data array for a file request. This creates an array with a
601        4 byte command + the remote file name.
602
603        Args:
604        ------------
605        command :
606            the 4 byte command (ID_STAT, ID_RECV, ...)
607        path : string
608            The path, as a byte array, of the remote file on which to execute
609            the command.
610
611        return:
612        ------------
613            return the byte[] to send to the device through hdc
614        """
615        if isinstance(path, str):
616            try:
617                path = path.encode(DEFAULT_ENCODING)
618            except UnicodeEncodeError as _:
619                path = path.encode("UTF-8")
620
621        return struct.pack(
622            "%ds%ds%ds" % (len(command), FORMAT_BYTES_LENGTH, len(path)),
623            command, self.swap32bits_to_bytes(len(path)), path)
624
625    def create_send_file_req(self, command, path, mode=0o644):
626        # make the mode into a string
627        mode_str = ",%s" % str(mode & FilePermission.mode_777)
628        mode_content = mode_str.encode(DEFAULT_ENCODING)
629        return struct.pack(
630            "%ds%ds%ds%ds" % (len(command), FORMAT_BYTES_LENGTH, len(path),
631                              len(mode_content)),
632            command, self.swap32bits_to_bytes(len(path) + len(mode_content)),
633            path, mode_content)
634
635    def create_req(self, command, value):
636        """
637        Create a command with a code and an int values
638        """
639        return struct.pack("%ds%ds" % (len(command), FORMAT_BYTES_LENGTH),
640                           command, self.swap32bits_to_bytes(value))
641
642    @staticmethod
643    def check_result(result, code):
644        """
645        Checks the result array starts with the provided code
646
647        Args:
648        ------------
649        result :
650            the result array to check
651        path : string
652            the 4 byte code
653
654        return:
655        ------------
656        bool
657            return true if the code matches
658        """
659        return result[0:4] == code[0:4]
660
661    def read_error_message(self, result):
662        """
663        Reads an error message from the opened Socket.
664
665        Args:
666        ------------
667        result :
668            the current hdc result. Must contain both FAIL and the length of
669            the message.
670        """
671        if self.check_result(result, ID_FAIL):
672            length = self.swap32bit_from_array(result, 4)
673            if length > 0:
674                return str(HdcHelper.read(self.sock, length))
675
676        return None
677
678    @staticmethod
679    def swap32bits_to_bytes(value):
680        """
681        Swaps an unsigned value around, and puts the result in an bytes that
682        can be sent to a device.
683
684        Args:
685        ------------
686        value :
687            the value to swap.
688        """
689        return bytes([value & 0x000000FF,
690                      (value & 0x0000FF00) >> 8,
691                      (value & 0x00FF0000) >> 16,
692                      (value & 0xFF000000) >> 24])
693
694    @staticmethod
695    def swap32bit_from_array(value, offset):
696        """
697        Reads a signed 32 bit integer from an array coming from a device.
698
699        Args:
700        ------------
701        value :
702            the array containing the int
703        offset:
704            the offset in the array at which the int starts
705
706        Return:
707        ------------
708        int
709            the integer read from the array
710        """
711        result = 0
712        result |= (int(value[offset])) & 0x000000FF
713        result |= (int(value[offset + 1]) & 0x000000FF) << 8
714        result |= (int(value[offset + 2]) & 0x000000FF) << 16
715        result |= (int(value[offset + 3]) & 0x000000FF) << 24
716
717        return result
718
719
720class HdcHelper:
721    CONNECTOR_NAME = ""
722
723    @staticmethod
724    def push_file(device, local, remote, is_create=False,
725                  timeout=DEFAULT_TIMEOUT):
726        device.log.info("{} execute command: {} file send {} to {}".format(convert_serial(device.device_sn),
727                                                                           HdcHelper.CONNECTOR_NAME,
728                                                                           local, remote))
729        HdcHelper._operator_file("file send", device, local, remote, timeout)
730
731    @staticmethod
732    def pull_file(device, remote, local, is_create=False,
733                  timeout=DEFAULT_TIMEOUT):
734        device.log.info("{} execute command: {} file recv {} to {}".format(convert_serial(device.device_sn),
735                                                                           HdcHelper.CONNECTOR_NAME,
736                                                                           remote, local))
737        HdcHelper._operator_file("file recv", device, remote, local, timeout)
738
739    @staticmethod
740    def _install_remote_package(device, remote_file_path, command):
741        receiver = CollectingOutputReceiver()
742        cmd = "bm install %s %s" % (command.strip(), remote_file_path)
743        HdcHelper.execute_shell_command(device, cmd, INSTALL_TIMEOUT, receiver)
744        return receiver.output
745
746    @staticmethod
747    def install_package(device, package_file_path, command):
748        device.log.info("%s install %s" % (convert_serial(device.device_sn),
749                                           package_file_path))
750        remote_file_path = "/data/local/tmp/%s" % os.path.basename(
751            package_file_path)
752
753        service = None
754        try:
755            service = SyncService(device, host=device.host, port=device.port)
756            service.open_sync()
757            service.push_file(package_file_path, remote_file_path)
758        finally:
759            if service is not None:
760                service.close()
761
762        result = HdcHelper._install_remote_package(device, remote_file_path,
763                                                   command)
764        HdcHelper.execute_shell_command(device, "rm %s " % remote_file_path)
765        return result
766
767    @staticmethod
768    def uninstall_package(device, package_name):
769        receiver = CollectingOutputReceiver()
770        command = "bm uninstall -n %s " % package_name
771        device.log.info("%s %s" % (convert_serial(device.device_sn), command))
772        HdcHelper.execute_shell_command(device, command, INSTALL_TIMEOUT,
773                                        receiver)
774        return receiver.output
775
776    @staticmethod
777    def reboot(device, into=None):
778        device.log.info("{} execute command: {} target boot".format(convert_serial(device.device_sn),
779                                                                    HdcHelper.CONNECTOR_NAME))
780        with HdcHelper.socket(host=device.host, port=device.port) as sock:
781            HdcHelper.handle_shake(sock, device.device_sn)
782            request = HdcHelper.form_hdc_request("target boot")
783            HdcHelper.write(sock, request)
784
785    @staticmethod
786    def execute_shell_command(device, command, timeout=DEFAULT_TIMEOUT,
787                              receiver=None, **kwargs):
788        """
789        Executes a shell command on the device and retrieve the output.
790
791        Args:
792        ------------
793        device : IDevice
794            on which to execute the command.
795        command : string
796            the shell command to execute
797        timeout : int
798            max time between command output. If more time passes between
799            command output, the method will throw
800            ShellCommandUnresponsiveException (ms).
801        """
802        try:
803            if not timeout:
804                timeout = DEFAULT_TIMEOUT
805            with HdcHelper.socket(host=device.host, port=device.port,
806                                  timeout=timeout) as sock:
807                output_flag = kwargs.get("output_flag", True)
808                timeout_msg = " with timeout %ss" % str(timeout / 1000)
809                message = "{} execute command: {} shell {}{}".format(convert_serial(device.device_sn),
810                                                                     HdcHelper.CONNECTOR_NAME,
811                                                                     command, timeout_msg)
812                if output_flag:
813                    LOG.info(message)
814                else:
815                    LOG.debug(message)
816                from xdevice import Scheduler
817                HdcHelper.handle_shake(sock, device.device_sn)
818                request = HdcHelper.form_hdc_request("shell {}".format(command))
819                HdcHelper.write(sock, request)
820                resp = HdcResponse()
821                resp.okay = True
822                while True:
823                    len_buf = sock.recv(DATA_UNIT_LENGTH)
824                    if len_buf:
825                        length = struct.unpack("!I", len_buf)[0]
826                    else:
827                        break
828                    data = sock.recv(length)
829                    ret = HdcHelper.reply_to_string(data)
830                    if ret:
831                        if receiver:
832                            receiver.__read__(ret)
833                        else:
834                            LOG.debug(ret)
835                    if not Scheduler.is_execute:
836                        raise ExecuteTerminate()
837                return resp
838        except socket.timeout as _:
839            device.log.error("ShellCommandUnresponsiveException: %s shell %s timeout[%sS]" % (
840                convert_serial(device.device_sn), command, str(timeout / 1000)))
841            raise ShellCommandUnresponsiveException()
842        finally:
843            if receiver:
844                receiver.__done__()
845
846    @staticmethod
847    def set_device(device, sock):
848        """
849        Tells hdc to talk to a specific device
850        if the device is not -1, then we first tell hdc we're looking to talk
851        to a specific device
852        """
853        msg = "host:transport:%s" % device.device_sn
854        device_query = HdcHelper.form_hdc_request(msg)
855        HdcHelper.write(sock, device_query)
856        resp = HdcHelper.read_hdc_response(sock)
857        if not resp.okay:
858            raise HdcCommandRejectedException(resp.message)
859
860    @staticmethod
861    def form_hdc_request(req):
862        """
863        Create an ASCII string preceded by four hex digits.
864        """
865        try:
866            if not req.endswith('\0'):
867                req = "%s\0" % req
868            req = req.encode("utf-8")
869            fmt = "!I%ss" % len(req)
870            result = struct.pack(fmt, len(req), req)
871        except UnicodeEncodeError as ex:
872            LOG.error(ex)
873            raise ex
874        return result
875
876    @staticmethod
877    def read_hdc_response(sock, read_diag_string=False):
878        """
879        Reads the response from HDC after a command.
880
881        Args:
882        ------------
883        read_diag_string :
884            If true, we're expecting an OKAY response to be followed by a
885            diagnostic string. Otherwise, we only expect the diagnostic string
886            to follow a FAIL.
887        """
888        resp = HdcResponse()
889        reply = HdcHelper.read(sock, DATA_UNIT_LENGTH)
890        if HdcHelper.is_okay(reply):
891            resp.okay = True
892        else:
893            read_diag_string = True
894            resp.okay = False
895
896        while read_diag_string:
897            len_buf = HdcHelper.read(sock, DATA_UNIT_LENGTH)
898            len_str = HdcHelper.reply_to_string(len_buf)
899            msg = HdcHelper.read(sock, int(len_str, HEXADECIMAL_NUMBER))
900            resp.message = HdcHelper.reply_to_string(msg)
901            break
902
903        return resp
904
905    @staticmethod
906    def write(sock, req, timeout=10):
907        if isinstance(req, str):
908            req = req.encode(DEFAULT_ENCODING)
909        elif isinstance(req, list):
910            req = bytes(req)
911
912        start_time = time.time()
913        while req:
914            if time.time() - start_time > timeout:
915                LOG.debug("Socket write timeout, timeout:%ss" % timeout)
916                break
917
918            size = sock.send(req)
919            if size < 0:
920                raise DeviceError("channel EOF")
921
922            req = req[size:]
923            time.sleep(5 / 1000)
924
925    @staticmethod
926    def read(sock, length, timeout=10):
927        data = b''
928        recv_len = 0
929        start_time = time.time()
930        exc_num = 3
931        while length - recv_len > 0:
932            if time.time() - start_time > timeout:
933                LOG.debug("Socket read timeout, timout:%ss" % timeout)
934                break
935            try:
936                recv = sock.recv(length - recv_len)
937                if len(recv) > 0:
938                    time.sleep(5 / 1000)
939                else:
940                    break
941            except ConnectionResetError as error:
942                if exc_num <= 0:
943                    raise error
944                exc_num = exc_num - 1
945                recv = b''
946                time.sleep(1)
947                LOG.debug("ConnectionResetError occurs")
948
949            data += recv
950            recv_len += len(recv)
951
952        return data
953
954    @staticmethod
955    def is_okay(reply):
956        """
957        Checks to see if the first four bytes in "reply" are OKAY.
958        """
959        return reply[0:4] == ID_OKAY
960
961    @staticmethod
962    def reply_to_string(reply):
963        """
964        Converts an HDC reply to a string.
965        """
966        try:
967            return str(reply, encoding=DEFAULT_ENCODING)
968        except (ValueError, TypeError) as _:
969            return ""
970
971    @staticmethod
972    def socket(host=None, port=None, timeout=None):
973        end = time.time() + 10 * 60
974        sock = None
975
976        while host not in HdcMonitor.MONITOR_MAP or \
977                HdcMonitor.MONITOR_MAP[host].main_hdc_connection is None:
978            LOG.debug("Host: %s, port: %s, HdcMonitor map is %s" % (
979                host, port, HdcMonitor.MONITOR_MAP))
980            if host in HdcMonitor.MONITOR_MAP:
981                LOG.debug("Monitor main hdc connection is %s" %
982                          HdcMonitor.MONITOR_MAP[host].main_hdc_connection)
983            if time.time() > end:
984                raise HdcError("Cannot detect HDC monitor!")
985            time.sleep(2)
986
987        try:
988            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
989            sock.connect((host, int(port)))
990        except socket.error as exception:
991            LOG.exception("Connect hdc server error: %s" % str(exception),
992                          exc_info=False)
993            raise exception
994
995        if sock is None:
996            raise HdcError("Cannot connect hdc server!")
997
998        if timeout is not None:
999            sock.setblocking(False)
1000            sock.settimeout(timeout / 1000)
1001
1002        return sock
1003
1004    @staticmethod
1005    def handle_shake(connection, connect_key=""):
1006        reply = HdcHelper.read(connection, 48)
1007        struct.unpack(">I12s32s", reply)
1008        banner_str = b'OHOS HDC'
1009        connect_key = connect_key.encode("utf-8")
1010        size = struct.calcsize('12s256s')
1011        fmt = "!I12s256s"
1012        pack_cmd = struct.pack(fmt, size, banner_str, connect_key)
1013        HdcHelper.write(connection, pack_cmd)
1014        return True
1015
1016    @staticmethod
1017    def _operator_file(command, device, local, remote, timeout):
1018        sock = HdcHelper.socket(host=device.host, port=device.port,
1019                                timeout=timeout)
1020        HdcHelper.handle_shake(sock, device.device_sn)
1021        request = HdcHelper.form_hdc_request(
1022            "%s %s %s" % (command, local, remote))
1023        HdcHelper.write(sock, request)
1024        reply = HdcHelper.read(sock, DATA_UNIT_LENGTH)
1025        length = struct.unpack("!I", reply)[0]
1026        data_buf = HdcHelper.read(sock, length)
1027        HdcHelper.reply_to_string(data_buf)
1028        LOG.debug("result is {}".format(data_buf))
1029
1030    @staticmethod
1031    def is_hdc_std():
1032        return HDC_STD_NAME in HdcHelper.CONNECTOR_NAME
1033
1034
1035class DeviceConnector(object):
1036    __instance = None
1037    __init_flag = False
1038
1039    def __init__(self, host=None, port=None, usb_type=None):
1040        if DeviceConnector.__init_flag:
1041            return
1042        self.device_listeners = []
1043        self.device_monitor = None
1044        self.host = host if host else "127.0.0.1"
1045        self.usb_type = usb_type
1046        connector_name = HdcMonitor.peek_hdc()
1047        HdcHelper.CONNECTOR_NAME = connector_name
1048        if port:
1049            self.port = int(port)
1050        else:
1051            self.port = int(os.getenv("HDC_SERVER_PORT", DEFAULT_STD_PORT))
1052
1053    def start(self):
1054        self.device_monitor = HdcMonitor.get_instance(
1055            self.host, self.port, device_connector=self)
1056        self.device_monitor.start()
1057
1058    def terminate(self):
1059        if self.device_monitor:
1060            self.device_monitor.stop()
1061        self.device_monitor = None
1062
1063    def add_device_change_listener(self, device_change_listener):
1064        self.device_listeners.append(device_change_listener)
1065
1066    def remove_device_change_listener(self, device_change_listener):
1067        if device_change_listener in self.device_listeners:
1068            self.device_listeners.remove(device_change_listener)
1069
1070    def device_connected(self, device):
1071        LOG.debug("DeviceConnector device connected:host %s, port %s, "
1072                  "device sn %s " % (self.host, self.port, device.device_sn))
1073        if device.host != self.host or device.port != self.port:
1074            LOG.debug("DeviceConnector device error")
1075        for listener in self.device_listeners:
1076            listener.device_connected(device)
1077
1078    def device_disconnected(self, device):
1079        LOG.debug("DeviceConnector device disconnected:host %s, port %s, "
1080                  "device sn %s" % (self.host, self.port, device.device_sn))
1081        if device.host != self.host or device.port != self.port:
1082            LOG.debug("DeviceConnector device error")
1083        for listener in self.device_listeners:
1084            listener.device_disconnected(device)
1085
1086    def device_changed(self, device):
1087        LOG.debug("DeviceConnector device changed:host %s, port %s, "
1088                  "device sn %s" % (self.host, self.port, device.device_sn))
1089        if device.host != self.host or device.port != self.port:
1090            LOG.debug("DeviceConnector device error")
1091        for listener in self.device_listeners:
1092            listener.device_changed(device)
1093
1094
1095class CollectingOutputReceiver(IShellReceiver):
1096    def __init__(self):
1097        self.output = ""
1098
1099    def __read__(self, output):
1100        self.output = "%s%s" % (self.output, output)
1101
1102    def __error__(self, message):
1103        pass
1104
1105    def __done__(self, result_code="", message=""):
1106        pass
1107
1108
1109class DisplayOutputReceiver(IShellReceiver):
1110    def __init__(self):
1111        self.output = ""
1112        self.unfinished_line = ""
1113
1114    def _process_output(self, output, end_mark="\n"):
1115        content = output
1116        if self.unfinished_line:
1117            content = "".join((self.unfinished_line, content))
1118            self.unfinished_line = ""
1119        lines = content.split(end_mark)
1120        if content.endswith(end_mark):
1121            # get rid of the tail element of this list contains empty str
1122            return lines[:-1]
1123        else:
1124            self.unfinished_line = lines[-1]
1125            # not return the tail element of this list contains unfinished str,
1126            # so we set position -1
1127            return lines[:-1]
1128
1129    def __read__(self, output):
1130        self.output = "%s%s" % (self.output, output)
1131        lines = self._process_output(output)
1132        for line in lines:
1133            line = line.strip()
1134            if line:
1135                LOG.info(line)
1136
1137    def __error__(self, message):
1138        pass
1139
1140    def __done__(self, result_code="", message=""):
1141        pass
1142
1143
1144def process_command_ret(ret, receiver):
1145    try:
1146        if ret != "" and receiver:
1147            receiver.__read__(ret)
1148            receiver.__done__()
1149    except Exception as _:
1150        LOG.exception("Error generating log report.", exc_info=False)
1151        raise ReportException()
1152
1153    if ret != "" and not receiver:
1154        lines = ret.split("\n")
1155        for line in lines:
1156            line = line.strip()
1157            if line:
1158                LOG.debug(line)
1159