• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Copyright (c) 2024 Huawei Device Co., Ltd.
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9    http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16
17Description: MISC action words.
18"""
19
20import asyncio
21import logging
22import os
23import stat
24import subprocess
25import threading
26
27from aw.websocket import WebSocket
28
29
30class Utils(object):
31    @classmethod
32    def message_id_generator(cls):
33        message_id = 1
34        while True:
35            yield message_id
36            message_id += 1
37
38    @classmethod
39    def get_custom_protocols(cls):
40        protocols = ["removeBreakpointsByUrl",
41                     "setMixedDebugEnabled",
42                     "replyNativeCalling",
43                     "getPossibleAndSetBreakpointByUrl",
44                     "dropFrame",
45                     "setNativeRange",
46                     "resetSingleStepper",
47                     "callFunctionOn",
48                     "smartStepInto",
49                     "callFunctionOn"]
50        return protocols
51
52    @classmethod
53    async def communicate_with_debugger_server(cls, instance_id, to_send_queue, received_queue, command, message_id):
54        """
55        Assembles and send the commands, then return the response.
56        Send message to the debugger server corresponding to the to_send_queue.
57        Return the response from the received_queue.
58        """
59        command['id'] = message_id
60        await WebSocket.send_msg_to_debugger_server(instance_id, to_send_queue, command)
61        response = await WebSocket.recv_msg_of_debugger_server(instance_id, received_queue)
62        return response
63
64    @classmethod
65    async def async_wait_timeout(cls, task, timeout=3):
66        try:
67            result = await asyncio.wait_for(task, timeout)
68            return result
69        except asyncio.TimeoutError:
70            logging.error('await timeout!')
71
72    @classmethod
73    def hdc_target_mount(cls):
74        mount_cmd = ['hdc', 'target', 'mount']
75        logging.info('Mount finish: ' + ' '.join(mount_cmd))
76        mount_result = subprocess.run(mount_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
77        logging.info(mount_result.stdout.strip())
78        assert mount_result.stdout.decode('utf-8').strip() == 'Mount finish'
79
80    @classmethod
81    def clear_fault_log(cls):
82        cmd = ['hdc', 'shell', 'rm', '/data/log/faultlog/faultlogger/*']
83        logging.info(' '.join(cmd))
84        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
85        assert result.returncode == 0
86
87    @classmethod
88    def save_fault_log(cls, log_path):
89        if not os.path.exists(log_path):
90            os.makedirs(log_path)
91
92        cmd = ['hdc', 'file', 'recv', '/data/log/faultlog/faultlogger/', log_path]
93        logging.info(' '.join(cmd))
94        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
95        logging.info(result.stdout.strip())
96        assert result.returncode == 0
97
98    @classmethod
99    def save_hilog(cls, log_path, file_name, debug_on=False):
100        if not os.path.exists(log_path):
101            os.makedirs(log_path)
102
103        # config the hilog
104        cmd = ['hdc', 'shell', 'hilog', '-r']
105        logging.info(' '.join(cmd))
106        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
107        assert result.returncode == 0
108
109        if debug_on:
110            cmd = ['hdc', 'shell', 'hilog', '-G', '16M']
111            logging.info(' '.join(cmd))
112            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
113            assert result.returncode == 0
114
115            cmd = ['hdc', 'shell', 'hilog', '-Q', 'pidoff']
116            logging.info(' '.join(cmd))
117            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
118            assert result.returncode == 0
119
120            cmd = ['hdc', 'shell', 'hilog', '-Q', 'domainoff']
121            logging.info(' '.join(cmd))
122            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
123            assert result.returncode == 0
124
125            cmd = ['hdc', 'shell', 'hilog', '-b', 'd']
126            logging.info(' '.join(cmd))
127            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
128            assert result.returncode == 0
129
130            cmd = ['hdc', 'shell', 'setprop', 'persist.sys.hilog.debug.on', 'true']
131            logging.info(' '.join(cmd))
132            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
133            assert result.returncode == 0
134
135            cmd = ['hdc', 'shell', 'hilog', '-b', 'DEBUG']
136            logging.info(' '.join(cmd))
137            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
138            assert result.returncode == 0
139
140        # create a sub-process to receive the hilog
141        hilog_process = subprocess.Popen(['hdc', 'shell', 'hilog'],
142                                         stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
143        file = os.fdopen(os.open(rf'{log_path}\{file_name}',
144                                 os.O_WRONLY | os.O_CREAT,
145                                 stat.S_IRUSR | stat.S_IWUSR), 'wb')
146
147        def write():
148            try:
149                for line in iter(hilog_process.stdout.readline, b''):
150                    file.write(line)
151                    file.flush()
152            except ValueError:
153                logging.info('hilog stream is closed.')
154            file.close()
155
156        write_thread = threading.Thread(target=write)
157        write_thread.start()
158
159        return hilog_process, write_thread
160