#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Copyright (c) 2024 Huawei Device Co., Ltd. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Description: Utils for action words. """ import asyncio import os import re import subprocess import threading from typing import Union from hypium import SystemUI, BY, MatchPattern from fport import Fport from taskpool import TaskPool from toolchain_websocket import ToolchainWebSocket class TimeRecord: def __init__(self, expected_time, actual_time): self.expected_time = expected_time self.actual_times = [actual_time] def avg_actual_time(self): return sum(self.actual_times) / len(self.actual_times) def max_actual_time(self): return max(self.actual_times) def min_actual_time(self): return min(self.actual_times) def add_actual_time(self, actual_time): self.actual_times.append(actual_time) def avg_proportion(self): return self.avg_actual_time * 100 / self.expected_time def rounds(self): return len(self.actual_times) def mid_actual_time(self): self.actual_times.sort() return self.actual_times[len(self.actual_times) // 2] class CommonUtils(object): def __init__(self, driver): self.driver = driver @staticmethod async def communicate_with_debugger_server(websocket, connection, command, message_id): ''' Assembles and send the commands, then return the response. Send message to the debugger server corresponding to the to_send_queue. Return the response from the received_queue. ''' command['id'] = message_id await websocket.send_msg_to_debugger_server(connection.instance_id, connection.send_msg_queue, command) response = await websocket.recv_msg_of_debugger_server(connection.instance_id, connection.received_msg_message) return response @staticmethod def get_custom_protocols(): protocols = ["removeBreakpointsByUrl", "setMixedDebugEnabled", "replyNativeCalling", "getPossibleAndSetBreakpointByUrl", "dropFrame", "setNativeRange", "resetSingleStepper", "callFunctionOn", "smartStepInto", "saveAllPossibleBreakpoints"] return protocols @staticmethod def message_id_generator(): message_id = 1 while True: yield message_id message_id += 1 @staticmethod def assert_equal(actual, expected): ''' 判断actual和expected是否相同,若不相同,则抛出异常 ''' assert actual == expected, f"\nExpected: {expected}\nActual: {actual}" @staticmethod def get_variables_from_properties(properties, prefix_name): ''' properties是Runtime.getProperties协议返回的变量信息 该方法会根据prefix_name前缀匹配properties中的变量名,符合的变量会以字典形式返回变量名和相关描述 描述首先采用description中的内容,但会删掉其中的哈希值,没有description则会采用subtype或type中的内容 ''' variables = {} for var in properties: if not var['name'].startswith(prefix_name): continue name = var['name'] value = var['value'] description = value.get('description') if description is not None: index_of_at = description.find('@') if index_of_at == -1: variables[name] = description continue variables[name] = description[:index_of_at] index_of_bracket = description.find('[', index_of_at + 1) if index_of_bracket != -1: variables[name] += description[index_of_bracket:] else: subtype = value.get('subtype') variables[name] = subtype if subtype is not None else value.get('type') return variables def get_pid(self, bundle_name): ''' 获取bundle_name对应的pid ''' pid = self.driver.shell("pidof " + bundle_name).strip() if not pid.isdigit(): return 0 self.driver.log_info(f'pid of {bundle_name}: ' + pid) return int(pid) def attach(self, bundle_name): ''' 通过bundle_name使指定应用进入调试模式 ''' attach_result = self.driver.shell(f"aa attach b {bundle_name}").strip() self.driver.log_info(attach_result) self.assert_equal(attach_result, 'attach app debug successfully.') def connect_server(self, config): ''' 根据config连接ConnectServer ''' fport = Fport(self.driver) fport.clear_fport() connect_server_port = fport.fport_connect_server(config['connect_server_port'], config['pid'], config['bundle_name']) assert connect_server_port > 0, 'Failed to fport connect server for 3 times, the port is very likely occupied' config['connect_server_port'] = connect_server_port config['websocket'] = ToolchainWebSocket(self.driver, config['connect_server_port'], config['debugger_server_port'], config.get('print_protocol', True)) config['taskpool'] = TaskPool() def hot_reload(self, hqf_path: Union[str, list]): ''' 根据hqf_path路径对应用进行热重载 ''' assert isinstance(hqf_path, (str, list)), 'Tyep of hqf_path is NOT string or list!' if isinstance(hqf_path, str): cmd = f'bm quickfix -a -f {hqf_path} -d' elif isinstance(hqf_path, list): cmd = f'bm quickfix -a -f {" ".join(hqf_path)} -d' self.driver.log_info('hot reload: ' + cmd) result = self.driver.shell(cmd).strip() self.driver.log_info(result) self.assert_equal(result, 'apply quickfic succeed.') async def async_wait_timeout(self, task, timeout=3): ''' 在timeout内执行task异步任务,若执行超时则抛出异常 ''' try: result = await asyncio.wait_for(task, timeout) return result except asyncio.TimeoutError: self.driver.log_error('await timeout!') def hdc_target_mount(self): ''' 挂载设备文件系统 ''' cmd = 'target mount' self.driver.log_info('Mount finish: ' + cmd) result = self.driver.hdc(cmd).strip() self.driver.log_info(result) self.assert_equal(result, 'Mount finish') def hdc_file_send(self, source, sink): ''' 将source中的文件发送到设备的sink路径下 ''' cmd = f'file send {source} {sink}' self.driver.log_info(cmd) result = self.driver.hdc(cmd) self.driver.log_info(result) assert 'FileTransfer finish' in result, result def clear_fault_log(self): ''' 清楚故障日志 ''' cmd = 'rm /data/log/faultlog/faultlogger/*' self.driver.log_info(cmd) result = self.driver.shell(cmd) self.driver.log_info(result) assert 'successfully' in result, result def save_fault_log(self, log_path): ''' 保存故障日志到log_path ''' if not os.path.exists(log_path): os.makedirs(log_path) cmd = f'file recv /data/log/faultlog/faultlogger/ {log_path}' self.driver.log_info(cmd) result = self.driver.hdc(cmd) self.driver.log_info(result) assert 'successfully' in result, result class UiUtils(object): def __init__(self, driver): self.driver = driver def get_screen_size(self): ''' 获取屏幕大小 ''' screen_info = self.driver.shell(f"hidumper -s RenderService -a screen") match = re.search(r'physical screen resolution: (\d+)x(\d+)', screen_info) # 新版本镜像中screen_info信息格式有变,需要用下方正则表达式获取 if match is None: match = re.search(r'physical resolution=(\d+)x(\d+)', screen_info) assert match is not None, f"screen_info is incorrect: {screen_info}" return int(match.group(1)), int(match.group(2)) def click(self, x, y): ''' 点击屏幕指定坐标位置 ''' click_result = self.driver.shell(f"uinput -T -c {x} {y}") self.driver.log_info(click_result) assert "click coordinate" in click_result, f"click_result is incorrect: {click_result}" def click_on_middle(self): ''' 点击屏幕中心 ''' width, height = self.get_screen_size() middle_x = width // 2 middle_y = height // 2 self.click(middle_x, middle_y) def back(self): ''' 返回上一步 ''' cmd = 'uitest uiInput keyEvent Back' self.driver.log_info('click the back button: ' + cmd) result = self.driver.shell(cmd).strip() self.driver.log_info(result) CommonUtils.assert_equal(result, 'No Error') def keep_awake(self): ''' 保持屏幕常亮,要在屏幕亮起时使用该方法才有效 ''' cmd = 'power-shell wakeup' result = self.driver.shell(cmd).strip() assert "WakeupDevice is called" in result, result cmd = 'power-shell timeout -o 214748647' result = self.driver.shell(cmd).strip() assert "Override screen off time to 214748647" in result, result cmd = 'power-shell setmode 602' result = self.driver.shell(cmd).strip() assert "Set Mode Success!" in result, result self.driver.log_info('Keep the screen awake Success!') def open_control_center(self): ''' 滑动屏幕顶部右侧打开控制中心 ''' width, height = self.get_screen_size() start = (int(width * 0.75), 20) end = (int(width * 0.75), int(height * 0.3)) cmd = f"uinput -T -m {start[0]} {start[1]} {end[0]} {end[1]} 500" self.driver.log_info('open control center') result = self.driver.shell(cmd) self.driver.log_info(result) def click_location_component(self): ''' 点击控制中心中的位置控件 ''' self.open_control_center() self.driver.wait(1) width, height = self.get_screen_size() self.click(int(width * 0.2), int(height * 0.95)) self.back() def disable_location(self): ''' 关闭位置信息(GPS),不能在异步任务中使用 ''' self.driver.wait(1) SystemUI.open_control_center(self.driver) comp = self.driver.find_component(BY.key("ToggleBaseComponent_Image_location", MatchPattern.CONTAINS)) result = comp.getAllProperties() try: bg = result.backgroudColor value = int("0x" + bg[3:], base=16) # 读取背景颜色判断开启或关闭 if value < 0xffffff: comp.click() self.driver.wait(1) self.driver.press_back() except Exception as e: raise RuntimeError("Fail to disable location") def enable_location(self): ''' 开启位置信息(GPS),不能在异步任务中使用 ''' self.driver.wait(1) SystemUI.open_control_center(self.driver) comp = self.driver.find_component(BY.key("ToggleBaseComponent_Image_location", MatchPattern.CONTAINS)) result = comp.getAllProperties() try: bg = result.backgroudColor value = int("0x" + bg[3:], base=16) # 读取背景颜色判断开启或关闭 if value == 0xffffff: comp.click() self.driver.wait(1) self.driver.press_back() except Exception as e: raise RuntimeError("Fail to disable location") class PerformanceUtils(object): def __init__(self, driver): self.driver = driver self.time_data = {} self.file_path_hwpower_genie_engine = "/system/app" self.file_name_hwpower_genie_engine = "HwPowerGenieEngine3" self.cpu_num_list = [0] * 3 self.cpu_start_id = [0] * 3 self.total_cpu_num = 0 self.cpu_cluster_num = 0 self.prev_mode = 0 self.board_ipa_support = False self.perfg_version = "0.0" def show_performace_data_in_html(self): ''' 将性能数据以html格式展示出来 ''' header = ['测试用例', '执行次数', '期望执行时间(ms)', '最大执行时间(ms)', '最小执行时间(ms)', '执行时间中位数(ms)', '平均执行时间(ms)', '平均执行时间/期望执行时间(%)'] content = [] failed_cases = [] for key, value in self.time_data.items(): if value.avg_proportion() >= 130: failed_cases.append(key) content.append([key, value.rounds(), value.expected_time, value.max_actual_time(), value.min_actual_time(), value.mid_actual_time(), f'{value.avg_actual_time()}:.2f', f'{value.avg_proportion()}:.2f']) table_html = '
| {i} | ' row_html += '