1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3""" 4Copyright (c) 2024 Huawei Device Co., Ltd. 5Licensed under the Apache License, Version 2.0 (the "License"); 6you may not use this file except in compliance with the License. 7You may obtain a copy of the License at 8 9 http://www.apache.org/licenses/LICENSE-2.0 10 11Unless required by applicable law or agreed to in writing, software 12distributed under the License is distributed on an "AS IS" BASIS, 13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14See the License for the specific language governing permissions and 15limitations under the License. 16 17Description: MISC action words. 18""" 19 20import asyncio 21import logging 22import os 23import stat 24import subprocess 25import threading 26 27from aw.websocket import WebSocket 28 29 30class Utils(object): 31 @classmethod 32 def message_id_generator(cls): 33 message_id = 1 34 while True: 35 yield message_id 36 message_id += 1 37 38 @classmethod 39 def get_custom_protocols(cls): 40 protocols = ["removeBreakpointsByUrl", 41 "setMixedDebugEnabled", 42 "replyNativeCalling", 43 "getPossibleAndSetBreakpointByUrl", 44 "dropFrame", 45 "setNativeRange", 46 "resetSingleStepper", 47 "callFunctionOn", 48 "smartStepInto"] 49 return protocols 50 51 @classmethod 52 async def communicate_with_debugger_server(cls, instance_id, to_send_queue, received_queue, command, message_id): 53 """ 54 Assembles and send the commands, then return the response. 55 Send message to the debugger server corresponding to the to_send_queue. 56 Return the response from the received_queue. 57 """ 58 command['id'] = message_id 59 await WebSocket.send_msg_to_debugger_server(instance_id, to_send_queue, command) 60 response = await WebSocket.recv_msg_of_debugger_server(instance_id, received_queue) 61 return response 62 63 @classmethod 64 async def async_wait_timeout(cls, task, timeout=3): 65 try: 66 result = await asyncio.wait_for(task, timeout) 67 return result 68 except asyncio.TimeoutError: 69 logging.error('await timeout!') 70 71 @classmethod 72 def hdc_target_mount(cls): 73 mount_cmd = ['hdc', 'target', 'mount'] 74 logging.info('Mount finish: ' + ' '.join(mount_cmd)) 75 mount_result = subprocess.run(mount_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 76 logging.info(mount_result.stdout.strip()) 77 assert mount_result.stdout.decode('utf-8').strip() == 'Mount finish' 78 79 @classmethod 80 def hdc_file_send(cls, source, sink): 81 cmd = ['hdc', 'file', 'send', source, sink] 82 logging.info(' '.join(cmd)) 83 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 84 logging.info(result.stdout.strip()) 85 assert 'FileTransfer finish' in result.stdout.decode('utf-8').strip() 86 87 @classmethod 88 def clear_fault_log(cls): 89 cmd = ['hdc', 'shell', 'rm', '/data/log/faultlog/faultlogger/*'] 90 logging.info(' '.join(cmd)) 91 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 92 assert result.returncode == 0 93 94 @classmethod 95 def save_fault_log(cls, log_path): 96 if not os.path.exists(log_path): 97 os.makedirs(log_path) 98 99 cmd = ['hdc', 'file', 'recv', '/data/log/faultlog/faultlogger/', log_path] 100 logging.info(' '.join(cmd)) 101 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 102 logging.info(result.stdout.strip()) 103 assert result.returncode == 0 104 105 @classmethod 106 def save_hilog(cls, log_path, file_name, debug_on: bool = False): 107 if not os.path.exists(log_path): 108 os.makedirs(log_path) 109 110 # config the hilog 111 cmd = ['hdc', 'shell', 'hilog', '-r'] 112 logging.info(' '.join(cmd)) 113 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 114 assert result.returncode == 0 115 116 cmd = ['hdc', 'shell', 'hilog', '-G', '16M'] 117 logging.info(' '.join(cmd)) 118 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 119 assert result.returncode == 0 120 121 cmd = ['hdc', 'shell', 'hilog', '-Q', 'pidoff'] 122 logging.info(' '.join(cmd)) 123 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 124 assert result.returncode == 0 125 126 cmd = ['hdc', 'shell', 'hilog', '-Q', 'domainoff'] 127 logging.info(' '.join(cmd)) 128 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 129 assert result.returncode == 0 130 131 if debug_on: 132 cmd = ['hdc', 'shell', 'hilog', '-b', 'DEBUG'] 133 logging.info(' '.join(cmd)) 134 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 135 assert result.returncode == 0 136 else: 137 cmd = ['hdc', 'shell', 'hilog', '-b', 'INFO'] 138 logging.info(' '.join(cmd)) 139 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 140 assert result.returncode == 0 141 142 hilog_process = subprocess.Popen(['hdc', 'shell', 'hilog'], 143 stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 144 file = os.fdopen(os.open(rf'{log_path}\{file_name}', 145 os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 146 stat.S_IRUSR | stat.S_IWUSR), 'wb') 147 148 def write(): 149 try: 150 for line in iter(hilog_process.stdout.readline, b''): 151 file.write(line) 152 file.flush() 153 except ValueError: 154 logging.info('hilog stream is closed.') 155 file.close() 156 157 # create a thread to receive the hilog 158 write_thread = threading.Thread(target=write) 159 write_thread.start() 160 161 return hilog_process, write_thread 162 163 @classmethod 164 def search_hilog(cls, hilog_path, key_world): 165 matched_logs = [] 166 with open(hilog_path, 'rb') as file: 167 while True: 168 line = file.readline() 169 if not line: 170 logging.debug('Reach the end of the hilog file') 171 break 172 if key_world in line: 173 matched_logs.append(line.strip()) 174 return matched_logs 175