• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Copyright (c) 2024 Huawei Device Co., Ltd.
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9    http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16
17Description: MISC action words.
18"""
19
20import asyncio
21import logging
22import os
23import stat
24import subprocess
25import threading
26
27from aw.websocket import WebSocket
28
29
30class Utils(object):
31    @classmethod
32    def message_id_generator(cls):
33        message_id = 1
34        while True:
35            yield message_id
36            message_id += 1
37
38    @classmethod
39    def get_custom_protocols(cls):
40        protocols = ["removeBreakpointsByUrl",
41                     "setMixedDebugEnabled",
42                     "replyNativeCalling",
43                     "getPossibleAndSetBreakpointByUrl",
44                     "dropFrame",
45                     "setNativeRange",
46                     "resetSingleStepper",
47                     "callFunctionOn",
48                     "smartStepInto"]
49        return protocols
50
51    @classmethod
52    async def communicate_with_debugger_server(cls, instance_id, to_send_queue, received_queue, command, message_id):
53        """
54        Assembles and send the commands, then return the response.
55        Send message to the debugger server corresponding to the to_send_queue.
56        Return the response from the received_queue.
57        """
58        command['id'] = message_id
59        await WebSocket.send_msg_to_debugger_server(instance_id, to_send_queue, command)
60        response = await WebSocket.recv_msg_of_debugger_server(instance_id, received_queue)
61        return response
62
63    @classmethod
64    async def async_wait_timeout(cls, task, timeout=3):
65        try:
66            result = await asyncio.wait_for(task, timeout)
67            return result
68        except asyncio.TimeoutError:
69            logging.error('await timeout!')
70
71    @classmethod
72    def hdc_target_mount(cls):
73        mount_cmd = ['hdc', 'target', 'mount']
74        logging.info('Mount finish: ' + ' '.join(mount_cmd))
75        mount_result = subprocess.run(mount_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
76        logging.info(mount_result.stdout.strip())
77        assert mount_result.stdout.decode('utf-8').strip() == 'Mount finish'
78
79    @classmethod
80    def hdc_file_send(cls, source, sink):
81        cmd = ['hdc', 'file', 'send', source, sink]
82        logging.info(' '.join(cmd))
83        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
84        logging.info(result.stdout.strip())
85        assert 'FileTransfer finish' in result.stdout.decode('utf-8').strip()
86
87    @classmethod
88    def clear_fault_log(cls):
89        cmd = ['hdc', 'shell', 'rm', '/data/log/faultlog/faultlogger/*']
90        logging.info(' '.join(cmd))
91        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
92        assert result.returncode == 0
93
94    @classmethod
95    def save_fault_log(cls, log_path):
96        if not os.path.exists(log_path):
97            os.makedirs(log_path)
98
99        cmd = ['hdc', 'file', 'recv', '/data/log/faultlog/faultlogger/', log_path]
100        logging.info(' '.join(cmd))
101        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
102        logging.info(result.stdout.strip())
103        assert result.returncode == 0
104
105    @classmethod
106    def save_hilog(cls, log_path, file_name, debug_on: bool = False):
107        if not os.path.exists(log_path):
108            os.makedirs(log_path)
109
110        # config the hilog
111        cmd = ['hdc', 'shell', 'hilog', '-r']
112        logging.info(' '.join(cmd))
113        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
114        assert result.returncode == 0
115
116        cmd = ['hdc', 'shell', 'hilog', '-G', '16M']
117        logging.info(' '.join(cmd))
118        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
119        assert result.returncode == 0
120
121        cmd = ['hdc', 'shell', 'hilog', '-Q', 'pidoff']
122        logging.info(' '.join(cmd))
123        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
124        assert result.returncode == 0
125
126        cmd = ['hdc', 'shell', 'hilog', '-Q', 'domainoff']
127        logging.info(' '.join(cmd))
128        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
129        assert result.returncode == 0
130
131        if debug_on:
132            cmd = ['hdc', 'shell', 'hilog', '-b', 'DEBUG']
133            logging.info(' '.join(cmd))
134            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
135            assert result.returncode == 0
136        else:
137            cmd = ['hdc', 'shell', 'hilog', '-b', 'INFO']
138            logging.info(' '.join(cmd))
139            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
140            assert result.returncode == 0
141
142        hilog_process = subprocess.Popen(['hdc', 'shell', 'hilog'],
143                                         stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
144        file = os.fdopen(os.open(rf'{log_path}\{file_name}',
145                                 os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
146                                 stat.S_IRUSR | stat.S_IWUSR), 'wb')
147
148        def write():
149            try:
150                for line in iter(hilog_process.stdout.readline, b''):
151                    file.write(line)
152                    file.flush()
153            except ValueError:
154                logging.info('hilog stream is closed.')
155            file.close()
156
157        # create a thread to receive the hilog
158        write_thread = threading.Thread(target=write)
159        write_thread.start()
160
161        return hilog_process, write_thread
162
163    @classmethod
164    def search_hilog(cls, hilog_path, key_world):
165        matched_logs = []
166        with open(hilog_path, 'rb') as file:
167            while True:
168                line = file.readline()
169                if not line:
170                    logging.debug('Reach the end of the hilog file')
171                    break
172                if key_world in line:
173                    matched_logs.append(line.strip())
174        return matched_logs
175