• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import platform
21import socket
22import struct
23import threading
24import time
25import shutil
26import stat
27from dataclasses import dataclass
28
29from xdevice import DeviceOsType
30from xdevice import ReportException
31from xdevice import ExecuteTerminate
32from xdevice import platform_logger
33from xdevice import Plugin
34from xdevice import get_plugin
35from xdevice import IShellReceiver
36from xdevice import exec_cmd
37from xdevice import get_file_absolute_path
38from xdevice import ParamError
39
40from xdevice_extension._core.environment.device_state import DeviceState
41from xdevice_extension._core.exception import DeviceError
42from xdevice_extension._core.exception import HdcError
43from xdevice_extension._core.exception import HdcCommandRejectedException
44from xdevice_extension._core.exception import ShellCommandUnresponsiveException
45from xdevice_extension._core.utils import is_proc_running
46from xdevice_extension._core.utils import convert_ip
47from xdevice_extension._core.utils import convert_serial
48from xdevice_extension._core.utils import create_dir
49from xdevice_extension._core.constants import DeviceConnectorType
50from xdevice_extension._core.constants import FilePermission
51
52ID_OKAY = b'OKAY'
53ID_FAIL = b'FAIL'
54ID_STAT = b'STAT'
55ID_RECV = b'RECV'
56ID_DATA = b'DATA'
57ID_DONE = b'DONE'
58ID_SEND = b'SEND'
59ID_LIST = b'LIST'
60ID_DENT = b'DENT'
61
62DEFAULT_ENCODING = "ISO-8859-1"
63SYNC_DATA_MAX = 64 * 1024
64REMOTE_PATH_MAX_LENGTH = 1024
65SOCK_DATA_MAX = 256
66
67INSTALL_TIMEOUT = 2 * 60 * 1000
68DEFAULT_TIMEOUT = 40 * 1000
69
70MAX_CONNECT_ATTEMPT_COUNT = 10
71DATA_UNIT_LENGTH = 4
72HEXADECIMAL_NUMBER = 16
73SPECIAL_FILE_MODE = 41471
74FORMAT_BYTES_LENGTH = 4
75DEFAULT_OFFSET_OF_INT = 4
76
77DEFAULT_PORT = 5037
78INVALID_MODE_CODE = -1
79LOG = platform_logger("Hdc")
80
81
82class HdcMonitor:
83    """
84    A Device monitor.
85    This monitor connects to the Device Connector, gets device and
86    debuggable process information from it.
87    """
88    MONITOR_MAP = {}
89
90    def __init__(self, host="127.0.0.1", port=None, device_connector=None):
91        self.channel = dict()
92        self.channel.setdefault("host", host)
93        self.channel.setdefault("port", port)
94        self.main_hdc_connection = None
95        self.connection_attempt = 0
96        self.is_stop = False
97        self.monitoring = False
98        self.server = device_connector
99        self.devices = []
100
101    @staticmethod
102    def get_instance(host, port=None, device_connector=None):
103        if host not in HdcMonitor.MONITOR_MAP:
104            monitor = HdcMonitor(host, port, device_connector)
105            HdcMonitor.MONITOR_MAP[host] = monitor
106            LOG.debug("HdcMonitor map add host %s, map is %s" %
107                      (host, HdcMonitor.MONITOR_MAP))
108
109        return HdcMonitor.MONITOR_MAP[host]
110
111    def start(self):
112        """
113        Starts the monitoring.
114        """
115        try:
116            LOG.debug("HdcMonitor usb type is %s" % self.server.usb_type)
117            bridge_name = 'hdc_std'
118            if self.server.usb_type == DeviceConnectorType.hdc:
119                # tell if hdc has already been in the environ path.
120                env_hdc = shutil.which(bridge_name)
121                # if not, add xdevice's own hdc path to environ path.
122                if env_hdc is None:
123                    if os.name == "nt":
124                        hdc = get_file_absolute_path(
125                            "%s.exe" % bridge_name, alt_dir=os.path.join(
126                                "tools", bridge_name, "windows"))
127                    else:
128                        hdc = get_file_absolute_path(
129                            bridge_name, alt_dir=os.path.join
130                            ("tools", bridge_name, "linux"))
131                    xdevice_hdc_path = os.path.dirname(hdc)
132                    os.environ['PATH'] = os.pathsep.join(
133                        (os.environ['PATH'], xdevice_hdc_path)
134                    )
135                if not is_proc_running(bridge_name):
136                    self.start_hdc(
137                        connector=bridge_name,
138                        local_port=self.channel.setdefault(
139                            "port", DEFAULT_PORT))
140                    time.sleep(1)
141
142            server_thread = threading.Thread(target=self.loop_monitor,
143                                             name="HdcMonitor", args=())
144            server_thread.setDaemon(True)
145            server_thread.start()
146        except FileNotFoundError as _:
147            LOG.error("HdcMonitor can't find connector, init device "
148                      "environment failed!")
149
150    @staticmethod
151    def stop_hdc(connector="hdc_std"):
152        """
153        Starts the hdc host side server.
154        """
155        if connector == "hdc_std":
156            if is_proc_running(connector):
157                try:
158                    LOG.debug("HdcMonitor hdc kill")
159                    exec_cmd([connector, "kill"])
160                except ParamError as error:
161                    LOG.debug("HdcMonitor hdc kill error:%s" % error)
162                except FileNotFoundError as _:
163                    LOG.warning('Cannot kill hdc process, '
164                                'please close it manually!')
165
166    def stop(self):
167        """
168        Stops the monitoring.
169        """
170        for host in HdcMonitor.MONITOR_MAP:
171            LOG.debug("HdcMonitor stop host %s" % host)
172            monitor = HdcMonitor.MONITOR_MAP[host]
173            try:
174                monitor.is_stop = True
175                if monitor.main_hdc_connection is not None:
176                    monitor.main_hdc_connection.shutdown(2)
177                    monitor.main_hdc_connection.close()
178                    monitor.main_hdc_connection = None
179            except (socket.error, socket.gaierror, socket.timeout) as _:
180                LOG.error("HdcMonitor close socket exception")
181        HdcMonitor.MONITOR_MAP.clear()
182        LOG.debug("HdcMonitor hdc monitor stop!")
183        LOG.debug("HdcMonitor map is %s" % HdcMonitor.MONITOR_MAP)
184
185    def loop_monitor(self):
186        """
187        Monitors the devices. This connects to the Debug Bridge
188        """
189        while not self.is_stop:
190            try:
191                if self.main_hdc_connection is None:
192                    self.main_hdc_connection = self.open_hdc_connection()
193                    if self.main_hdc_connection is None:
194                        self.connection_attempt += 1
195
196                        if self.connection_attempt > MAX_CONNECT_ATTEMPT_COUNT:
197                            self.is_stop = True
198                            LOG.error(
199                                "HdcMonitor attempt %s, can't connect to hdc "
200                                "for Device List Monitoring" %
201                                str(self.connection_attempt))
202                            raise HdcError(
203                                "HdcMonitor cannot connect hdc server(%s %s),"
204                                " please check!" %
205                                (self.channel.get("host"),
206                                 str(self.channel.get("post"))))
207
208                        LOG.debug(
209                            "HdcMonitor Connection attempts: %s" %
210                            str(self.connection_attempt))
211
212                        time.sleep(2)
213                    else:
214                        LOG.debug(
215                            "HdcMonitor Connected to hdc_std for device "
216                            "monitoring, main_hdc_connection is %s" %
217                            self.main_hdc_connection)
218
219                if self.main_hdc_connection and not self.monitoring:
220                    LOG.debug("start monitoring list targets")
221                    self.monitoring_list_targets()
222                    len_buf = HdcHelper.read(self.main_hdc_connection,
223                                             DATA_UNIT_LENGTH)
224                    length = struct.unpack("!I", len_buf)[0]
225                    LOG.debug("had received length is: %s" % length)
226                    if length >= 0:
227                        self.monitoring = True
228                        self.process_incoming_target_data(length)
229            except (HdcError, Exception) as _:
230                LOG.debug("loop_monitor happened error: %s" % _)
231                self.handle_exception_monitor_loop()
232                break
233
234    def handle_exception_monitor_loop(self):
235        LOG.debug("Handle exception monitor loop: %s" %
236                  self.main_hdc_connection)
237        if self.main_hdc_connection is None:
238            return
239        self.main_hdc_connection.close()
240        LOG.debug("Handle exception monitor loop, main hdc connection closed, "
241                  "main hdc connection: %s" % self.main_hdc_connection)
242
243    def monitoring_list_targets(self):
244        HdcHelper.handle_shake(self.main_hdc_connection)
245        request = HdcHelper.form_hdc_request('list targets')
246        HdcHelper.write(self.main_hdc_connection, request)
247
248    def device_list_monitoring(self):
249        request = HdcHelper.form_hdc_request("host:track-devices")
250        HdcHelper.write(self.main_hdc_connection, request)
251        resp = HdcHelper.read_hdc_response(self.main_hdc_connection)
252        if not resp.okay:
253            LOG.error("HdcMonitor hdc rejected shell command")
254            raise Exception(resp.message)
255        else:
256            LOG.debug(
257                'HdcMonitor execute command success:send device_list '
258                'monitoring request')
259        return True
260
261    def process_incoming_device_data(self, length):
262        local_array_list = []
263        if length > 0:
264            data_buf = HdcHelper.read(self.main_hdc_connection, length)
265            data_str = HdcHelper.reply_to_string(data_buf)
266            lines = data_str.split('\n')
267            for line in lines:
268                items = line.strip().split('\t')
269                if len(items) != 2:
270                    continue
271                device_instance = self._get_device_instance(
272                    items, DeviceOsType.default)
273                local_array_list.append(device_instance)
274
275        self.update_devices(local_array_list)
276
277    def process_incoming_target_data(self, length):
278        local_array_list = []
279        data_buf = HdcHelper.read(self.main_hdc_connection, length)
280        data_str = HdcHelper.reply_to_string(data_buf)
281        if 'Empty' not in data_str:
282            lines = data_str.split('\n')
283            for line in lines:
284                items = line.strip().split('\t')
285                if not items[0] :
286                    continue
287                items.append(DeviceState.ONLINE.value)
288                device_instance = self._get_device_instance(
289                    items, DeviceOsType.default)
290                local_array_list.append(device_instance)
291        else:
292            LOG.debug("please check device actually.[%s]" % data_str)
293        LOG.debug("process target data, local_array_list" % local_array_list)
294        self.update_devices(local_array_list)
295
296    def _get_device_instance(self, items, os_type):
297        device = get_plugin(plugin_type=Plugin.DEVICE, plugin_id=os_type)[0]
298        device_instance = device.__class__()
299        device_instance.__set_serial__(items[0])
300        device_instance.host = self.channel.get("host")
301        device_instance.port = self.channel.get("port")
302        LOG.debug("dmlib get_device_instance %s %s %s" %
303                  (device_instance.device_sn,
304                   device_instance.host, device_instance.port))
305        device_instance.device_state = DeviceState.get_state(items[1])
306        return device_instance
307
308    def update_devices(self, param_array_list):
309        devices = [item for item in self.devices]
310        devices.reverse()
311        for local_device1 in devices:
312            k = 0
313            for local_device2 in param_array_list:
314                if local_device1.device_sn == local_device2.device_sn and \
315                        local_device1.device_os_type == \
316                        local_device2.device_os_type:
317                    k = 1
318                    if local_device1.device_state != \
319                            local_device2.device_state:
320                        local_device1.device_state = local_device2.device_state
321                        self.server.device_changed(local_device1)
322                        param_array_list.remove(local_device2)
323                        break
324
325            if k == 0:
326                self.devices.remove(local_device1)
327                self.server.device_disconnected(local_device1)
328        for local_device in param_array_list:
329            self.devices.append(local_device)
330            self.server.device_connected(local_device)
331
332    def open_hdc_connection(self):
333        """
334        Attempts to connect to the debug bridge server. Return a connect socket
335        if success, null otherwise.
336        """
337        sock = None
338        try:
339            LOG.debug(
340                "HdcMonitor connecting to hdc for Device List Monitoring")
341            LOG.debug("HdcMonitor socket connection host: %s, port: %s" %
342                      (str(convert_ip(self.channel.get("host"))),
343                       str(int(self.channel.get("port")))))
344
345            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
346            sock.connect((self.channel.get("host"),
347                          int(self.channel.get("port"))))
348            return sock
349        except (socket.error, socket.gaierror, socket.timeout) as exception:
350            LOG.error("HdcMonitor hdc socket connection Error: %s, "
351                      "host is %s, port is %s" % (str(exception),
352                                                  self.channel.get("host"),
353                                                  self.channel.get("port")))
354            return sock
355
356    def start_hdc(self, connector="hdc_std", kill=False, local_port=None):
357        """Starts the hdc host side server.
358
359        Args:
360            connector: connector type, like "hdc"
361            kill: if True, kill exist host side server
362            local_port: local port to start host side server
363
364        Returns:
365            None
366        """
367        if connector == "hdc_std":
368            if kill:
369                LOG.debug("HdcMonitor hdc kill")
370                exec_cmd([connector, "kill"])
371            LOG.debug("HdcMonitor hdc start")
372            exec_cmd(
373                [connector, "-s", "tcp:%s" % local_port, "reset"],
374                error_print=False)
375
376
377@dataclass
378class HdcResponse:
379    """Response from HDC."""
380    okay = ID_OKAY  # first 4 bytes in response were "OKAY"?
381    message = ""  # diagnostic string if okay is false
382
383
384class SyncService:
385    """
386    Sync service class to push/pull to/from devices/emulators,
387    through the debug bridge.
388    """
389    def __init__(self, device, host=None, port=None):
390        self.device = device
391        self.host = host
392        self.port = port
393        self.sock = None
394
395    def open_sync(self, timeout=DEFAULT_TIMEOUT):
396        """
397        Opens the sync connection. This must be called before any calls to
398        push[File] / pull[File].
399        Return true if the connection opened, false if hdc refuse the
400        connection. This can happen device is invalid.
401        """
402        LOG.debug("open sync, timeout=%s" % int(timeout/1000))
403        self.sock = HdcHelper.socket(host=self.host, port=self.port,
404                                     timeout=timeout)
405        HdcHelper.set_device(self.device, self.sock)
406
407        request = HdcHelper.form_hdc_request("sync:")
408        HdcHelper.write(self.sock, request)
409
410        resp = HdcHelper.read_hdc_response(self.sock)
411        if not resp.okay:
412            self.device.log.error(
413                "Got unhappy response from HDC sync req: %s" % resp.message)
414            raise HdcError(
415                "Got unhappy response from HDC sync req: %s" % resp.message)
416
417    def close(self):
418        """
419        Closes the connection.
420        """
421        if self.sock is not None:
422            try:
423                self.sock.close()
424            except socket.error as error:
425                LOG.error("socket close error: %s" % error, error_no="00420")
426            finally:
427                self.sock = None
428
429    def pull_file(self, remote, local, is_create=False):
430        """
431        Pulls a file.
432        The top directory won't be created if is_create is False (by default)
433        and vice versa
434        """
435        mode = self.read_mode(remote)
436        self.device.log.debug("Remote file %s mode is %d" % (remote, mode))
437        if mode == 0:
438            raise HdcError("Remote object doesn't exist!")
439
440        if str(mode).startswith("168"):
441            if is_create:
442                remote_file_split = os.path.split(remote)[-1] \
443                    if os.path.split(remote)[-1] else os.path.split(remote)[-2]
444                remote_file_basename = os.path.basename(remote_file_split)
445                new_local = os.path.join(local, remote_file_basename)
446                create_dir(new_local)
447            else:
448                new_local = local
449
450            collect_receiver = CollectingOutputReceiver()
451            HdcHelper.execute_shell_command(self.device, "ls %s" % remote,
452                                            receiver=collect_receiver)
453            files = collect_receiver.output.split()
454            for file_name in files:
455                self.pull_file("%s/%s" % (remote, file_name),
456                               new_local, is_create=True)
457        elif mode == SPECIAL_FILE_MODE:
458            self.device.log.info("skipping special file '%s'" % remote)
459        else:
460            if os.path.isdir(local):
461                local = os.path.join(local, os.path.basename(remote))
462
463            self.do_pull_file(remote, local)
464
465    def do_pull_file(self, remote, local):
466        """
467        Pulls a remote file
468        """
469        self.device.log.info(
470            "%s pull %s to %s" % (convert_serial(self.device.device_sn),
471                                  remote, local))
472        remote_path_content = remote.encode(DEFAULT_ENCODING)
473        if len(remote_path_content) > REMOTE_PATH_MAX_LENGTH:
474            raise HdcError("Remote path is too long.")
475
476        msg = self.create_file_req(ID_RECV, remote_path_content)
477        HdcHelper.write(self.sock, msg)
478        pull_result = HdcHelper.read(self.sock, DATA_UNIT_LENGTH * 2)
479        if not self.check_result(pull_result, ID_DATA) and \
480                not self.check_result(pull_result, ID_DONE):
481            raise HdcError(self.read_error_message(pull_result))
482        if platform.system() == "Windows":
483            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY
484        else:
485            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND
486        pulled_file_open = os.open(local, flags, FilePermission.mode_755)
487        with os.fdopen(pulled_file_open, "wb") as pulled_file:
488            while True:
489                if self.check_result(pull_result, ID_DONE):
490                    break
491
492                if not self.check_result(pull_result, ID_DATA):
493                    raise HdcError(self.read_error_message(pull_result))
494
495                try:
496                    length = self.swap32bit_from_array(
497                        pull_result, DEFAULT_OFFSET_OF_INT)
498                except IndexError as index_error:
499                    self.device.log.debug("do_pull_file: %s" %
500                                          str(pull_result))
501                    if pull_result == ID_DATA:
502                        pull_result = self.sock.recv(DATA_UNIT_LENGTH)
503                        self.device.log.debug(
504                            "do_pull_file: %s" % str(pull_result))
505                        length = self.swap32bit_from_array(pull_result, 0)
506                        self.device.log.debug("do_pull_file: %s" % str(length))
507                    else:
508                        raise IndexError(str(index_error))
509
510                if length > SYNC_DATA_MAX:
511                    raise HdcError("Receiving too much data.")
512
513                pulled_file.write(HdcHelper.read(self.sock, length))
514                pulled_file.flush()
515                pull_result = self.sock.recv(DATA_UNIT_LENGTH * 2)
516
517    def push_file(self, local, remote, is_create=False):
518        """
519        Push a single file.
520        The top directory won't be created if is_create is False (by default)
521        and vice versa
522        """
523        if not os.path.exists(local):
524            raise HdcError("Local path doesn't exist.")
525
526        if os.path.isdir(local):
527            if is_create:
528                local_file_split = os.path.split(local)[-1] \
529                    if os.path.split(local)[-1] else os.path.split(local)[-2]
530                local_file_basename = os.path.basename(local_file_split)
531                remote = "{}/{}".format(
532                    remote, local_file_basename)
533                HdcHelper.execute_shell_command(
534                    self.device, "mkdir -p %s" % remote)
535
536            for child in os.listdir(local):
537                file_path = os.path.join(local, child)
538                if os.path.isdir(file_path):
539                    self.push_file(
540                        file_path,  "%s/%s" % (remote, child),
541                        is_create=False)
542                else:
543                    self.do_push_file(file_path, "%s/%s" % (remote, child))
544        else:
545            self.do_push_file(local, remote)
546
547    def do_push_file(self, local, remote):
548        """
549        Push a single file
550
551        Args:
552        ------------
553        local : string
554            the local file to push
555        remote : string
556            the remote file (length max is 1024)
557        """
558        mode = self.read_mode(remote)
559        self.device.log.debug("Remote file %s mode is %d" % (remote, mode))
560        if self.device.usb_type == DeviceConnectorType.hdc:
561            self.device.log.debug("%s execute command: hdc push %s %s" % (
562                convert_serial(self.device.device_sn), local, remote))
563        if str(mode).startswith("168"):
564            remote = "%s/%s" % (remote, os.path.basename(local))
565
566        try:
567            try:
568                remote_path_content = remote.encode(DEFAULT_ENCODING)
569            except UnicodeEncodeError:
570                remote_path_content = remote.encode("UTF-8")
571            if len(remote_path_content) > REMOTE_PATH_MAX_LENGTH:
572                raise HdcError("Remote path is too long.")
573
574            # create the header for the action
575            msg = self.create_send_file_req(ID_SEND, remote_path_content,
576                                            FilePermission.mode_644)
577
578            # and send it. We use a custom try/catch block to make the
579            # difference between file and network IO exceptions.
580            HdcHelper.write(self.sock, msg)
581            flags = os.O_RDONLY
582            modes = stat.S_IWUSR | stat.S_IRUSR
583            with os.fdopen(os.open(local, flags, modes), "rb") as test_file:
584                while True:
585                    if platform.system() == "Linux":
586                        data = test_file.read(1024 * 4)
587                    else:
588                        data = test_file.read(SYNC_DATA_MAX)
589
590                    if not data:
591                        break
592
593                    buf = struct.pack(
594                        "%ds%ds%ds" % (len(ID_DATA), FORMAT_BYTES_LENGTH,
595                                       len(data)), ID_DATA,
596                        self.swap32bits_to_bytes(len(data)), data)
597                    self.sock.send(buf)
598        except Exception as exception:
599            self.device.log.error("exception %s" % exception)
600            raise exception
601
602        msg = self.create_req(ID_DONE, int(time.time()))
603        HdcHelper.write(self.sock, msg)
604        result = HdcHelper.read(self.sock, DATA_UNIT_LENGTH * 2)
605        if not self.check_result(result, ID_OKAY):
606            self.device.log.error("exception %s" % result)
607            raise HdcError(self.read_error_message(result))
608
609    def read_mode(self, path):
610        """
611        Returns the mode of the remote file.
612        Return an Integer containing the mode if all went well or null
613        """
614        msg = self.create_file_req(ID_STAT, path)
615        HdcHelper.write(self.sock, msg)
616
617        # read the result, in a byte array containing 4 ints
618        stat_result = HdcHelper.read(self.sock, DATA_UNIT_LENGTH * 4)
619        if not self.check_result(stat_result, ID_STAT):
620            return INVALID_MODE_CODE
621
622        return self.swap32bit_from_array(stat_result, DEFAULT_OFFSET_OF_INT)
623
624    def create_file_req(self, command, path):
625        """
626        Creates the data array for a file request. This creates an array with a
627        4 byte command + the remote file name.
628
629        Args:
630        ------------
631        command :
632            the 4 byte command (ID_STAT, ID_RECV, ...)
633        path : string
634            The path, as a byte array, of the remote file on which to execute
635            the command.
636
637        return:
638        ------------
639            return the byte[] to send to the device through hdc
640        """
641        if isinstance(path, str):
642            try:
643                path = path.encode(DEFAULT_ENCODING)
644            except UnicodeEncodeError as _:
645                path = path.encode("UTF-8")
646
647        return struct.pack(
648            "%ds%ds%ds" % (len(command), FORMAT_BYTES_LENGTH, len(path)),
649            command, self.swap32bits_to_bytes(len(path)), path)
650
651    def create_send_file_req(self, command, path, mode=0o644):
652        # make the mode into a string
653        mode_str = ",%s" % str(mode & FilePermission.mode_777)
654        mode_content = mode_str.encode(DEFAULT_ENCODING)
655        return struct.pack(
656            "%ds%ds%ds%ds" % (len(command), FORMAT_BYTES_LENGTH, len(path),
657                              len(mode_content)),
658            command, self.swap32bits_to_bytes(len(path) + len(mode_content)),
659            path, mode_content)
660
661    def create_req(self, command, value):
662        """
663        Create a command with a code and an int values
664        """
665        return struct.pack("%ds%ds" % (len(command), FORMAT_BYTES_LENGTH),
666                           command, self.swap32bits_to_bytes(value))
667
668    @staticmethod
669    def check_result(result, code):
670        """
671        Checks the result array starts with the provided code
672
673        Args:
674        ------------
675        result :
676            the result array to check
677        path : string
678            the 4 byte code
679
680        return:
681        ------------
682        bool
683            return true if the code matches
684        """
685        return result[0:4] == code[0:4]
686
687    def read_error_message(self, result):
688        """
689        Reads an error message from the opened Socket.
690
691        Args:
692        ------------
693        result :
694            the current hdc result. Must contain both FAIL and the length of
695            the message.
696        """
697        if self.check_result(result, ID_FAIL):
698            length = self.swap32bit_from_array(result, 4)
699            if length > 0:
700                return str(HdcHelper.read(self.sock, length))
701
702        return None
703
704    @staticmethod
705    def swap32bits_to_bytes(value):
706        """
707        Swaps an unsigned value around, and puts the result in an bytes that
708        can be sent to a device.
709
710        Args:
711        ------------
712        value :
713            the value to swap.
714        """
715        return bytes([value & 0x000000FF,
716                      (value & 0x0000FF00) >> 8,
717                      (value & 0x00FF0000) >> 16,
718                      (value & 0xFF000000) >> 24])
719
720    @staticmethod
721    def swap32bit_from_array(value, offset):
722        """
723        Reads a signed 32 bit integer from an array coming from a device.
724
725        Args:
726        ------------
727        value :
728            the array containing the int
729        offset:
730            the offset in the array at which the int starts
731
732        Return:
733        ------------
734        int
735            the integer read from the array
736        """
737        result = 0
738        result |= (int(value[offset])) & 0x000000FF
739        result |= (int(value[offset + 1]) & 0x000000FF) << 8
740        result |= (int(value[offset + 2]) & 0x000000FF) << 16
741        result |= (int(value[offset + 3]) & 0x000000FF) << 24
742
743        return result
744
745
746class HdcHelper:
747    @staticmethod
748    def push_file(device, local, remote, is_create=False,
749                  timeout=DEFAULT_TIMEOUT):
750        if device.usb_type == DeviceConnectorType.hdc:
751            device.log.info("%s execute command: hdc file send %s %s" %
752                            (convert_serial(device.device_sn), local, remote))
753        if not os.path.exists(local):
754            raise HdcError("Local path doesn't exist.")
755        sock = HdcHelper.socket(host=device.host, port=device.port,
756                                timeout=timeout)
757        HdcHelper.handle_shake(sock, device.device_sn)
758        request = HdcHelper.form_hdc_request("file send %s %s" %
759                                             (local, remote))
760        HdcHelper.write(sock, request)
761        reply = HdcHelper.read(sock, 4)
762        length = struct.unpack("!I", reply)[0]
763        data_buf = HdcHelper.read(sock, length)
764        data_str = HdcHelper.reply_to_string(data_buf)
765        device.log.info(data_str)
766
767    @staticmethod
768    def pull_file(device, remote, local, is_create=False,
769                  timeout=DEFAULT_TIMEOUT):
770        if device.usb_type == DeviceConnectorType.hdc:
771            device.log.info("%s execute command: hdc file recv %s to %s" %
772                            (convert_serial(device.device_sn), remote, local))
773
774        sock = HdcHelper.socket(host=device.host, port=device.port,
775                                timeout=timeout)
776        HdcHelper.handle_shake(sock, device.device_sn)
777        request = HdcHelper.form_hdc_request("file recv %s %s" %
778                                             (remote, local))
779        HdcHelper.write(sock, request)
780        reply = HdcHelper.read(sock, 4)
781        length = struct.unpack("!I", reply)[0]
782        data_buf = HdcHelper.read(sock, length)
783        data_str = HdcHelper.reply_to_string(data_buf)
784        device.log.info(data_str)
785
786    @staticmethod
787    def _install_remote_package(device, remote_file_path, command):
788        receiver = CollectingOutputReceiver()
789        cmd = "pm install %s %s" % (command.strip(), remote_file_path)
790        HdcHelper.execute_shell_command(device, cmd, INSTALL_TIMEOUT, receiver)
791        return receiver.output
792
793    @staticmethod
794    def install_package(device, package_file_path, command):
795        device.log.info("%s install %s" % (convert_serial(device.device_sn),
796                                           package_file_path))
797        remote_file_path = "/data/local/tmp/%s" % os.path.basename(
798            package_file_path)
799
800        result = HdcHelper._install_remote_package(device, remote_file_path,
801                                                   command)
802        HdcHelper.execute_shell_command(device, "rm %s " % remote_file_path)
803        return result
804
805    @staticmethod
806    def uninstall_package(device, package_name):
807        receiver = CollectingOutputReceiver()
808        command = "bm uninstall -n %s" % package_name
809        device.log.info("%s %s" % (convert_serial(device.device_sn), command))
810        HdcHelper.execute_shell_command(device, command, INSTALL_TIMEOUT,
811                                        receiver)
812        return receiver.output
813
814    @staticmethod
815    def reboot(device, into=None):
816        if device.usb_type == DeviceConnectorType.hdc:
817            device.log.info("%s execute command: hdc target boot" %
818                            convert_serial(device.device_sn))
819        with HdcHelper.socket(host=device.host, port=device.port) as sock:
820            HdcHelper.handle_shake(sock, device.device_sn)
821            request = HdcHelper.form_hdc_request("reboot")
822
823            HdcHelper.write(sock, request)
824
825    @staticmethod
826    def execute_shell_command(device, command, timeout=DEFAULT_TIMEOUT,
827                              receiver=None, **kwargs):
828        """
829        Executes a shell command on the device and retrieve the output.
830
831        Args:
832        ------------
833        device : IDevice
834            on which to execute the command.
835        command : string
836            the shell command to execute
837        timeout : int
838            max time between command output. If more time passes between
839            command output, the method will throw
840            ShellCommandUnresponsiveException (ms).
841        """
842        try:
843            if not timeout:
844                timeout = DEFAULT_TIMEOUT
845            with HdcHelper.socket(host=device.host, port=device.port,
846                                  timeout=timeout) as sock:
847                output_flag = kwargs.get("output_flag", True)
848                timeout_msg = " with timeout %ss" % str(timeout/1000)
849                end_mark = kwargs.get("end_mark", "")
850                read_timeout = kwargs.get("read_timeout", None)
851                if device.usb_type == DeviceConnectorType.hdc:
852                    message = "%s execute command: hdc shell %s%s" % \
853                              (convert_serial(device.device_sn), command,
854                               timeout_msg)
855                if output_flag:
856                    LOG.info(message)
857                else:
858                    LOG.debug(message)
859
860                HdcHelper.handle_shake(sock, device.device_sn)
861                request = HdcHelper.form_hdc_request("shell %s" % command)
862                HdcHelper.write(sock, request)
863                resp = HdcResponse()
864                resp.okay = True
865
866                from xdevice import Scheduler
867                start_time = int(time.time())
868                while True:
869                    len_buf = HdcHelper.read(sock, DATA_UNIT_LENGTH)
870                    if len_buf:
871                        length = struct.unpack("!I", len_buf)[0]
872                    else:
873                        break
874                    data = sock.recv(length)
875                    ret = HdcHelper.reply_to_string(data)
876                    if ret:
877                        if receiver:
878                            receiver.__read__(ret)
879                        else:
880                            LOG.debug(ret)
881                    if end_mark and end_mark in ret:
882                        break
883                    if read_timeout and \
884                            int(time.time()) - start_time > read_timeout:
885                        break
886                    if not Scheduler.is_execute:
887                        raise ExecuteTerminate()
888                return resp
889        except socket.timeout as _:
890            device.log.error("%s shell %s timeout[%sS]" % (
891                convert_serial(device.device_sn), command, str(timeout/1000)))
892            raise ShellCommandUnresponsiveException()
893        finally:
894            if receiver:
895                receiver.__done__()
896
897    @staticmethod
898    def set_device(device, sock):
899        """
900        tells hdc to talk to a specific device
901        if the device is not -1, then we first tell hdc we're looking to talk
902        to a specific device
903        """
904        msg = "host:transport:%s" % device.device_sn
905        device_query = HdcHelper.form_hdc_request(msg)
906        HdcHelper.write(sock, device_query)
907        resp = HdcHelper.read_hdc_response(sock)
908        if not resp.okay:
909            raise HdcCommandRejectedException(resp.message)
910
911    @staticmethod
912    def form_hdc_request(req):
913        """
914        Create an ASCII string preceded by four hex digits.
915        """
916        try:
917            if not req.endswith('\0'):
918                req = "%s\0" % req
919            req = req.encode("utf-8")
920            fmt = "!I%ss" % len(req)
921            result = struct.pack(fmt, len(req), req)
922        except UnicodeEncodeError as ex:
923            LOG.error(ex)
924            raise ex
925        return result
926
927    @staticmethod
928    def read_hdc_response(sock, read_diag_string=False):
929        """
930        Reads the response from HDC after a command.
931
932        Args:
933        ------------
934        read_diag_string :
935            If true, we're expecting an OKAY response to be followed by a
936            diagnostic string. Otherwise, we only expect the diagnostic string
937            to follow a FAIL.
938        """
939        resp = HdcResponse()
940        reply = HdcHelper.read(sock, DATA_UNIT_LENGTH)
941        if HdcHelper.is_okay(reply):
942            resp.okay = True
943        else:
944            read_diag_string = True
945            resp.okay = False
946
947        while read_diag_string:
948            len_buf = HdcHelper.read(sock, DATA_UNIT_LENGTH)
949            len_str = HdcHelper.reply_to_string(len_buf)
950            msg = HdcHelper.read(sock, int(len_str, HEXADECIMAL_NUMBER))
951            resp.message = HdcHelper.reply_to_string(msg)
952            break
953
954        return resp
955
956    @staticmethod
957    def write(sock, req, timeout=5):
958        if isinstance(req, str):
959            req = req.encode(DEFAULT_ENCODING)
960        elif isinstance(req, list):
961            req = bytes(req)
962
963        start_time = time.time()
964        while req:
965            if time.time() - start_time > timeout:
966                LOG.debug("Socket write timeout, timeout:%ss" % timeout)
967                break
968            size = sock.send(req)
969            if size < 0:
970                raise DeviceError("channel EOF")
971
972            req = req[size:]
973            time.sleep(5 / 1000)
974
975    @staticmethod
976    def read(sock, length, timeout=5):
977        data = b''
978        recv_len = 0
979        start_time = time.time()
980        exc_num = 3
981        while length - recv_len > 0:
982            if time.time() - start_time > timeout:
983                LOG.debug("socket read timeout, timeout:%ss" % timeout)
984                break
985            try:
986                recv = sock.recv(length - recv_len)
987                if len(recv) > 0:
988                    time.sleep(5 / 1000)
989                else:
990                    break
991            except ConnectionResetError as error:
992                if exc_num <= 0:
993                    raise error
994                exc_num = exc_num - 1
995                recv = b''
996                time.sleep(1)
997                LOG.debug("ConnectionResetError occurs")
998
999            data += recv
1000            recv_len += len(recv)
1001
1002        return data
1003
1004    @staticmethod
1005    def is_okay(reply):
1006        """
1007        Checks to see if the first four bytes in "reply" are OKAY.
1008        """
1009        return reply[0:4] == ID_OKAY
1010
1011    @staticmethod
1012    def reply_to_string(reply):
1013        """
1014        Converts an HDC reply to a string.
1015        """
1016        try:
1017            return str(reply, encoding=DEFAULT_ENCODING)
1018        except (ValueError, TypeError) as _:
1019            return ""
1020
1021    @staticmethod
1022    def socket(host=None, port=None, timeout=None):
1023        end = time.time() + 10 * 60
1024        sock = None
1025
1026        while host not in HdcMonitor.MONITOR_MAP or \
1027                HdcMonitor.MONITOR_MAP[host].main_hdc_connection is None:
1028            LOG.debug("Host: %s, port: %s, HdcMonitor map is %s" % (
1029                host, port, HdcMonitor.MONITOR_MAP))
1030            if host in HdcMonitor.MONITOR_MAP:
1031                LOG.debug("Monitor main hdc connection is %s" %
1032                          HdcMonitor.MONITOR_MAP[host].main_hdc_connection)
1033            if time.time() > end:
1034                raise HdcError("Cannot detect HDC monitor!")
1035            time.sleep(2)
1036
1037        try:
1038            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1039            sock.connect((host, int(port)))
1040        except socket.error as exception:
1041            LOG.exception("Connect hdc server error: %s" % str(exception),
1042                          exc_info=False)
1043            raise exception
1044
1045        if sock is None:
1046            raise HdcError("Cannot connect hdc server!")
1047
1048        if timeout is not None:
1049            sock.setblocking(False)
1050            sock.settimeout(timeout/1000)
1051
1052        return sock
1053
1054    @staticmethod
1055    def handle_shake(connection, connect_key=""):
1056        reply = HdcHelper.read(connection, 48)
1057        reply_size = struct.unpack(">I12s32s", reply)
1058        banner_str = b'OHOS HDC'
1059        connect_key = connect_key.encode("utf-8")
1060        size = struct.calcsize('12s256s')
1061        fmt = "!I12s256s"
1062        pack_cmd = struct.pack(fmt, size, banner_str, connect_key)
1063        HdcHelper.write(connection, pack_cmd)
1064        return True
1065
1066
1067class DeviceConnector(object):
1068    __instance = None
1069    __init_flag = False
1070
1071    def __init__(self, host=None, port=None, usb_type=None):
1072        if DeviceConnector.__init_flag:
1073            return
1074        self.device_listeners = []
1075        self.device_monitor = None
1076        self.host = host if host else "127.0.0.1"
1077        self.usb_type = usb_type
1078        if port:
1079            self.port = int(port)
1080        elif usb_type == DeviceConnectorType.hdc:
1081            self.port = int(os.getenv("HDC_SERVER_PORT", 8710))
1082
1083    def start(self):
1084        self.device_monitor = HdcMonitor.get_instance(
1085            self.host, self.port, device_connector=self)
1086        self.device_monitor.start()
1087
1088    def terminate(self):
1089        if self.device_monitor:
1090            self.device_monitor.stop()
1091        self.device_monitor = None
1092
1093    def add_device_change_listener(self, device_change_listener):
1094        self.device_listeners.append(device_change_listener)
1095
1096    def remove_device_change_listener(self, device_change_listener):
1097        if device_change_listener in self.device_listeners:
1098            self.device_listeners.remove(device_change_listener)
1099
1100    def device_connected(self, device):
1101        LOG.debug("DeviceConnector device_connected:host %s, port %s, "
1102                  "device_sn %s " % (self.host, self.port, device.device_sn))
1103        if device.host != self.host or device.port != self.port:
1104            LOG.debug("DeviceConnector device error")
1105        for listener in self.device_listeners:
1106            listener.device_connected(device)
1107
1108    def device_disconnected(self, device):
1109        LOG.debug("DeviceConnector device_disconnected:host %s, port %s, "
1110                  "device_sn %s" % (self.host, self.port, device.device_sn))
1111        if device.host != self.host or device.port != self.port:
1112            LOG.debug("DeviceConnector device error")
1113        for listener in self.device_listeners:
1114            listener.device_disconnected(device)
1115
1116    def device_changed(self, device):
1117        LOG.debug("DeviceConnector device_changed:host %s, port %s, "
1118                  "device_sn %s" % (self.host, self.port, device.device_sn))
1119        if device.host != self.host or device.port != self.port:
1120            LOG.debug("DeviceConnector device error")
1121        for listener in self.device_listeners:
1122            listener.device_changed(device)
1123
1124
1125class CollectingOutputReceiver(IShellReceiver):
1126    def __init__(self):
1127        self.output = ""
1128
1129    def __read__(self, output):
1130        self.output = "%s%s" % (self.output, output)
1131
1132    def __error__(self, message):
1133        pass
1134
1135    def __done__(self, result_code="", message=""):
1136        pass
1137
1138
1139class DisplayOutputReceiver(IShellReceiver):
1140    def __init__(self):
1141        self.output = ""
1142        self.unfinished_line = ""
1143
1144    def _process_output(self, output, end_mark="\n"):
1145        content = output
1146        if self.unfinished_line:
1147            content = "".join((self.unfinished_line, content))
1148            self.unfinished_line = ""
1149        lines = content.split(end_mark)
1150        if content.endswith(end_mark):
1151            # get rid of the tail element of this list contains empty str
1152            return lines[:-1]
1153        else:
1154            self.unfinished_line = lines[-1]
1155            # not return the tail element of this list contains unfinished str,
1156            # so we set position -1
1157            return lines[:-1]
1158
1159    def __read__(self, output):
1160        self.output = "%s%s" % (self.output, output)
1161        lines = self._process_output(output)
1162        for line in lines:
1163            line = line.strip()
1164            if line:
1165                LOG.info(line)
1166
1167    def __error__(self, message):
1168        pass
1169
1170    def __done__(self, result_code="", message=""):
1171        pass
1172
1173
1174def process_command_ret(ret, receiver):
1175    try:
1176        if ret != "" and receiver:
1177            receiver.__read__(ret)
1178            receiver.__done__()
1179    except Exception as _:
1180        LOG.exception("Error generating log report.", exc_info=False)
1181        raise ReportException()
1182
1183    if ret != "" and not receiver:
1184        lines = ret.split("\n")
1185        for line in lines:
1186            line = line.strip()
1187            if line:
1188                LOG.debug(line)
1189