1import json 2import logging 3import os.path 4import re 5import subprocess 6import time 7import threading 8 9 10class Device: 11 lock = threading.Lock() 12 13 @classmethod 14 def _execute_cmd(cls, cmd): 15 with cls.lock: 16 logging.info(f'[In]{cmd}') 17 rst = subprocess.run(cmd, capture_output=True, shell=True, timeout=30, encoding='utf-8') 18 out_put = rst.stdout or rst.stderr 19 time.sleep(0.5) 20 # 布局的回显太多了,不打印 21 if not re.search(r'cat /data/local/tmp/\S+json', cmd): 22 logging.info(f'[Out]{out_put}') 23 return out_put 24 25 def __init__(self, sn): 26 self.sn = sn 27 self.report_path = '' 28 self.resource_path = '' 29 self.width = 720 30 self.height = 1280 31 self.velocity_range = (200, 40000) 32 self._element_list = [] 33 # self.get_render_size() 34 35 def hdc_shell(self, cmd): 36 out = '' 37 for i in range(3): 38 shell_cmd = f'hdc -l0 -t {self.sn} shell "{cmd}"' 39 out = self._execute_cmd(shell_cmd) 40 if '[Fail]Device not founded or connected' in out: 41 self.restart_hdc_process() 42 time.sleep(5) 43 else: 44 break 45 return out 46 47 def hdc(self, cmd): 48 return self._execute_cmd(f'hdc -l0 -t {self.sn} {cmd}') 49 50 def hdc_version(self): 51 return self._execute_cmd('hdc -v') 52 53 def hdc_list_targets(self): 54 devices = self._execute_cmd('hdc list targets') 55 if 'Empty' in devices: 56 return [] 57 return devices.splitlines() 58 59 def install_hap(self, hap_path, replace=True): 60 """ 61 暗账应用 62 :param hap_path: hap包的路径 63 :param replace: 是否覆盖安装,true覆盖,否则不覆盖 64 :return: 65 """ 66 logging.info('install hap') 67 if replace: 68 cmd = f'app install -r {hap_path}' 69 else: 70 cmd = f'app install {hap_path}' 71 return self.hdc(cmd) 72 73 def install_multi_hap(self, hap_list: list): 74 """ 75 安装更新, 多hap可以指定多个文件路径 76 :param hap_list: 77 :return: 78 """ 79 logging.info('install hap') 80 haps = ' '.join(hap_list) 81 cmd = 'install {}'.format(haps) 82 return self.hdc(cmd) 83 84 def bm_install(self, hap_list: list): 85 """ 86 bm工具安装更新多个hap 87 :param hap_list: 88 :return: 89 """ 90 logging.info('install hap') 91 haps = ' '.join(hap_list) 92 cmd = 'bm install -p {}'.format(haps) 93 return self.hdc_shell(cmd) 94 95 def uninstall_hap(self, bundle_name): 96 # 两个命令都可以 97 logging.info(f'uninstall {bundle_name}') 98 # cmd = 'hdc app uninstall {}'.format(bundle_name) 99 return self.hdc(f'uninstall {bundle_name}') 100 101 def bm_uninstall(self, bundle_name): 102 logging.info(f'uninstall {bundle_name}') 103 return self.hdc_shell(f'bm uninstall -n {bundle_name}') 104 105 def hdc_file_send(self, local, remote): 106 logging.info('send file to device') 107 return self.hdc(f'file send "{local}" "{remote}"') 108 109 def hdc_file_recv(self, remote, local=''): 110 logging.info('recv file from device') 111 local = local or self.report_path 112 return self.hdc(f'file recv "{remote}" "{local}"') 113 114 def hilog(self): 115 logging.info('hilog') 116 return self.hdc('hilog') 117 118 def get_udid(self): 119 logging.info('get udid') 120 return self.hdc('bm get --udid') 121 122 def kill_hdc_process(self): 123 logging.info('kill hdc process') 124 return self._execute_cmd('hdc kill') 125 126 def restart_hdc_process(self): 127 # logging.info('restart hdc process') 128 return self._execute_cmd('hdc start -r') 129 130 def reboot(self): 131 # logging.info('reboot device') 132 return self.hdc_shell('reboot') 133 134 def start_ability(self, bundle_name, ability_name): 135 # logging.info(f'start {bundle_name} application') 136 rst = self.hdc_shell(f'aa start -b {bundle_name} -a {ability_name}') 137 time.sleep(2) 138 return rst 139 140 def force_stop(self, bundle_name): 141 # logging.info(f'停掉{bundle_name}应用') 142 return self.hdc_shell(f'aa force-stop {bundle_name}') 143 144 def clean_app_data(self, bundle_name): 145 """ 146 清除应用缓存和数据, -c缓存, -d应用数据 147 :param bundle_name: 148 :return: 149 """ 150 if not bundle_name: 151 return 152 # logging.info(f'清理{bundle_name}应用数据和缓存') 153 return self.hdc_shell(f'bm clean -n {bundle_name} -c -d') 154 155 def disable_app(self, bundle_name): 156 """ 157 禁止应用,应用在桌面消失 158 :param bundle_name: 159 :return: 160 """ 161 # logging.info(f'禁止{bundle_name}应用,应用在桌面消失') 162 return self.hdc_shell(f'bm disable -n {bundle_name}') 163 164 def enable_app(self, bundle_name): 165 """ 166 允许应用,应用显示在桌面上 167 :param bundle_name: 168 :return: 169 """ 170 # logging.info(f'允许{bundle_name}应用,应用显示在桌面上') 171 return self.hdc_shell(f'bm enable -n {bundle_name}') 172 173 def dump_hap_configuration(self, bundle_name): 174 """ 175 查看应用配置信息 176 :param bundle_name: 177 :return: 178 """ 179 # logging.info(f'查看{bundle_name}应用配置信息') 180 return self.hdc_shell(f'bm dump -n {bundle_name}') 181 182 def stop_permission(self): 183 # self.click(504, 708) 184 # logging.info(f'消掉权限请求的弹窗') 185 focus_win = self.get_focus_window() 186 if 'permissionDialog1' in focus_win: 187 # logging.info(f'消掉权限请求的弹窗') 188 self.refresh_layout() 189 allow = self.get_element_by_condition(condition={'text': '允许', 'type': 'Button'}) 190 self.click_element(allow) 191 time.sleep(2) 192 else: 193 logging.info(f'current window: {focus_win}') 194 # return self.force_stop('com.ohos.permissionmanager') 195 196 def click(self, x: int, y: int): 197 """ 198 模拟触摸按下 199 :param x: 200 :param y: 201 :return: 202 """ 203 # logging.info(f'点击({x},{y})坐标') 204 return self.hdc_shell(f'uinput -M -m {x} {y} -c 0') 205 # return self.hdc_shell(f'uinput -T -c {x} {y}') 206 # return self.hdc_shell(f'uitest uiInput click {x} {y}') 207 208 def click_element(self, e): 209 x, y = self.center_of_element(e) 210 return self.click(x, y) 211 212 def double_click(self, x, y): 213 # logging.info(f'双击({x},{y})坐标') 214 return self.hdc_shell(f'uitest uiInput doubleClick {x} {y}') 215 216 def double_click_element(self, e): 217 x, y = self.center_of_element(e) 218 return self.double_click(x, y) 219 220 def long_click(self, x, y): 221 # logging.info(f'长按({x},{y})坐标') 222 return self.hdc_shell(f'uitest uiInput longClick {x} {y}') 223 224 def long_click_element(self, e): 225 x, y = self.center_of_element(e) 226 return self.long_click(x, y) 227 228 def dirc_fling(self, direct=0): 229 """ 230 模拟指定方向滑动 231 :param direct:direction (可选参数,滑动方向,可选值: [0,1,2,3], 滑动方向: [左,右,上,下],默认值: 0) 232 swipeVelocityPps_ (可选参数,滑动速度,取值范围: 200-40000, 默认值: 600, 单位: px/s) 233 stepLength(可选参数,滑动步长,默认值:滑动距离/50, 单位: px) 234 :return: 235 """ 236 direct_map = { 237 0: '左', 238 1: '右', 239 2: '上', 240 3: '下', 241 } 242 if direct not in direct_map.keys(): 243 direct = 0 244 # logging.info(f'向 {direct_map.get(direct)} 滑动') 245 return self.hdc_shell(f'uitest uiInput dircFling {direct}') 246 247 def swipe(self, from_x, from_y, to_x, to_y, velocity=600): 248 """ 249 模拟慢滑操作 250 :param from_x:(必选参数,滑动起点x坐标) 251 :param from_y:(必选参数,滑动起点y坐标) 252 :param to_x:(必选参数,滑动终点x坐标) 253 :param to_y:(必选参数,滑动终点y坐标) 254 :param velocity: (可选参数,滑动速度,取值范围: 200-40000, 默认值: 600, 单位: px/s) 255 :return: 256 """ 257 # 保证数据取值范围合理 258 # logging.info(f'从({from_x},{from_y})滑动到({to_x},{to_y}),滑动速度:{velocity}px/s') 259 return self.hdc_shell(f'uitest uiInput swipe {from_x} {from_y} {to_x} {to_y} {velocity}') 260 261 def fling(self, from_x, from_y, to_x, to_y, velocity=600): 262 """ 263 模拟快滑操作 264 :param from_x:(必选参数,滑动起点x坐标) 265 :param from_y:(必选参数,滑动起点y坐标) 266 :param to_x:(必选参数,滑动终点x坐标) 267 :param to_y:(必选参数,滑动终点y坐标) 268 :param velocity: (可选参数,滑动速度,取值范围: 200-40000, 默认值: 600, 单位: px/s) 269 :return: 270 """ 271 # 保证数据取值范围合理 272 # 保证数据取值范围合理 273 # logging.info(f'从({from_x},{from_y})快速滑动到({to_x},{to_y})') 274 return self.hdc_shell(f'uitest uiInput fling {from_x} {from_y} {to_x} {to_y}') 275 276 def drag(self, from_x, from_y, to_x, to_y, velocity=600): 277 """ 278 拖拽,从(x1, y1)拖拽到(x2, y2) 279 :param from_x:(必选参数,滑动起点x坐标) 280 :param from_y:(必选参数,滑动起点y坐标) 281 :param to_x:(必选参数,滑动终点x坐标) 282 :param to_y:(必选参数,滑动终点y坐标) 283 :param velocity: (可选参数,滑动速度,取值范围: 200-40000, 默认值: 600, 单位: px/s) 284 """ 285 # logging.info(f'从({from_x},{from_y})拖到({to_x},{to_y}),拖动速度:{velocity}px/s') 286 return self.hdc_shell(f'uitest uiInput drag {from_x} {from_y} {to_x} {to_y} {velocity}') 287 288 def key_event(self, key_code): 289 # logging.info(f'按下{key_code}键') 290 return self.hdc_shell(f'uitest uiInput keyEvent {key_code}') 291 292 def go_home(self): 293 return self.key_event('Home') 294 295 def go_back(self): 296 return self.key_event('Back') 297 298 def press_power_key(self): 299 return self.key_event('Power') 300 301 def press_recent_key(self): 302 return self.click(514, 1243) 303 # return self.key_event('2078') 304 305 def clear_recent_task(self): 306 # logging.info('清理最近的任务') 307 self.press_recent_key() 308 time.sleep(0.5) 309 self.press_clear_btn() 310 311 def press_clear_btn(self): 312 self.click(360, 1170) 313 314 def input_text(self, x, y, text=''): 315 # logging.info(f'向({x, y})坐标处输入“{text}”') 316 return self.hdc_shell(f'uitest uiInput inputText {x} {y} {text}') 317 318 def wakeup(self): 319 # logging.info('点亮屏幕') 320 return self.hdc_shell('power-shell wakeup') 321 322 def suspend(self): 323 # logging.info('熄灭屏幕') 324 return self.hdc_shell('power-shell suspend') 325 326 def set_power_mode(self, mode='602'): 327 """ 328 设置电源模式 329 :param mode:600 normal mode 正常模式 330 601 power save mode省电模式 331 602 performance mode性能模式,屏幕会常亮 332 603 extreme power save mode极端省电模式 333 :return: 334 """ 335 power_map = { 336 '600': '正常模式', 337 '601': '省电模式', 338 '602': '性能模式', 339 '603': '极端省电模式', 340 } 341 if mode not in power_map.keys(): 342 mode = '602' 343 # logging.info(f'设置电源为{power_map.get(mode)}') 344 return self.hdc_shell(f'power-shell setmode {mode}') 345 346 def display_screen_state(self): 347 # logging.info('获取屏幕点亮状态') 348 return self.hdc_shell('hidumper -s 3308') 349 350 def get_render_size(self): 351 # logging.info('获取屏幕分辨率') 352 rst = self.hdc_shell('hidumper -s RenderService -a screen') 353 xy = re.findall(r'render size: (\d+)x(\d+)', rst)[0] 354 self.width, self.height = [int(i) for i in xy] 355 356 def dump_layout(self, file_name=''): 357 # logging.info('获取当前页面布局') 358 if file_name: 359 file_path = '/data/local/tmp/' + file_name 360 dump_rst = self.hdc_shell(f'uitest dumpLayout -p {file_path}') 361 else: 362 dump_rst = self.hdc_shell(f'uitest dumpLayout') 363 layout_file = dump_rst.split(':')[1].strip() 364 return layout_file 365 366 def save_layout_to_local(self, file_name=''): 367 layout_file = self.dump_layout(file_name) 368 self.hdc_file_recv(layout_file) 369 370 def refresh_layout(self, file_name=''): 371 """ 372 :param file_name: 文件名不包含路径 373 :return: 374 """ 375 file_name = file_name or 'tmp_layout.json' 376 tmp_file = self.dump_layout(file_name) 377 json_data = json.loads(self.hdc_shell(f'cat {tmp_file}')) 378 self._element_list = self._parse_attribute_nodes(json_data) 379 # 将控件按从上到下,从左到右的顺序排列 380 self._element_list.sort(key=lambda e: [self.center_of_element(e)[1], self.center_of_element(e)[0]]) 381 382 def snapshot_display(self, jpeg=''): 383 """jpeg必须在/data/local/tmp目录""" 384 # logging.info('获取当前页面截图') 385 if jpeg: 386 jpeg = '/data/local/tmp/' + jpeg 387 shot_rst = self.hdc_shell(f'snapshot_display -f {jpeg}') 388 else: 389 shot_rst = self.hdc_shell('snapshot_display') 390 if 'success' not in shot_rst: 391 return '' 392 return re.findall(r'write to(.*)as jpeg', shot_rst)[0].strip() 393 394 def save_snapshot_to_local(self, file_name=''): 395 tmp_file = self.snapshot_display(file_name) 396 save_file = os.path.join(self.report_path, tmp_file.split('/')[-1]) 397 self.hdc_file_recv(tmp_file, self.report_path) 398 return save_file 399 400 def clear_local_tmp(self): 401 self.hdc_shell('rm -rf /data/local/tmp/*') 402 403 def set_screen_timeout(self, timeout=600): 404 """ 405 设置屏幕超时时间 406 :param timeout: 单位s 407 :return: 408 """ 409 # logging.info(f'设置屏幕{timeout}s无操作后休眠') 410 return self.hdc_shell(f'power-shell timeout -o {timeout * 1000}') 411 412 def rm_faultlog(self): 413 # logging.info('删除fault log') 414 self.hdc_shell('rm -f /data/log/faultlog/SERVICE_BLOCK*') 415 self.hdc_shell('rm -f /data/log/faultlog/appfreeze*') 416 self.hdc_shell('rm -f /data/log/faultlog/temp/cppcrash*') 417 self.hdc_shell('rm -f /data/log/faultlog/faultlogger/jscrash*') 418 self.hdc_shell('rm -f /data/log/faultlog/faultlogger/appfreeze*') 419 self.hdc_shell('rm -f /data/log/faultlog/faultlogger/cppcrash*') 420 421 def start_hilog(self): 422 # logging.info('开启hilog') 423 self.hdc_shell('hilog -w stop;hilog -w clear') 424 self.hdc_shell('hilog -r;hilog -b INFO;hilog -w start -l 10M -n 1000') 425 426 def stop_and_collect_hilog(self, local_dir=''): 427 # logging.info('停止并收集hilog') 428 self.hdc_shell('hilog -w stop') 429 self.hdc_file_recv('/data/log/hilog/', local_dir) 430 431 def unlock(self): 432 """ 433 滑动解锁 434 :return: 435 """ 436 # logging.info('解锁屏幕') 437 return self.dirc_fling(2) 438 439 def assert_process_running(self, process): 440 # logging.info(f'检查{process}进程是否存在') 441 rst = self.hdc_shell(f'ps -ef | grep -w {process} | grep -v grep') 442 assert process in rst, f'process {process} not exist' 443 444 def get_pid(self, process): 445 # logging.info(f'获取{process}进程PID') 446 return self.hdc_shell(f'pidof {process}').strip() 447 448 def get_wifi_status(self): 449 # hidumper -ls查看所有hidumper服务 450 # logging.info('获取wifi状态') 451 status = self.hdc_shell('hidumper -s WifiDevice') 452 active_state = re.findall(r'WiFi active state: (.*)', status)[0].strip() 453 connection_status = re.findall(r'WiFi connection status: (.*)', status)[0].strip() 454 455 scan_status = self.hdc_shell('hidumper -s WifiScan') 456 is_scan_running = re.findall(r'Is scan service running: (.*)', scan_status)[0].strip() 457 return { 458 'active': active_state, 459 'connected': connection_status, 460 'scanning': is_scan_running 461 } 462 463 def dump_windows_manager_service(self): 464 rst = self.hdc_shell("hidumper -s WindowManagerService -a '-a'") 465 return rst 466 467 def get_focus_window(self): 468 time.sleep(1) 469 text = self.dump_windows_manager_service() 470 focus_win = re.findall(r'Focus window: (\d+)', text)[0].strip() 471 win_id_index = 3 472 focus_window_name = '' 473 for line in text.splitlines(): 474 if line.startswith('Focus window'): 475 break 476 wms = re.match(r'\S*\s*(\d+\s+){' + str(win_id_index) + '}', line) 477 if wms: 478 data = wms.group().strip().split() 479 if data[win_id_index] == focus_win: 480 focus_window_name = data[0] 481 break 482 # logging.info('当前聚焦的窗口为:{}'.format(focus_window_name)) 483 return focus_window_name 484 485 def get_win_id(self, window_name): 486 text = self.dump_windows_manager_service() 487 win = re.search(window_name + r'\s*(\d+\s+){3}', text) 488 if not win: 489 return 490 win_id = win.group().split()[-1] 491 return win_id 492 493 def get_navigationb_winid(self): 494 return self.get_win_id('SystemUi_NavigationB') 495 496 def is_soft_keyboard_on(self): 497 if self.soft_keyboard(): 498 return True 499 return False 500 501 def soft_keyboard(self): 502 text = self.dump_windows_manager_service() 503 keyboard = re.search(r'softKeyboard1\s*(\d+\s+){8}\[\s+(\d+\s+){4}]', text) 504 if not keyboard: 505 return '' 506 return keyboard.group() 507 508 def get_elements_by_text(self, text): 509 ems = [] 510 for e in self._element_list: 511 if e.get('text') == text: 512 ems.append(e) 513 return ems 514 515 def get_element_by_text(self, text, index=0): 516 ems = self.get_elements_by_text(text) 517 if not ems: 518 return 519 return ems[index] 520 521 def assert_text_exist(self, text): 522 element = self.get_elements_by_text(text) 523 rst = 'Yes' if element else 'No' 524 logging.info('check [text]="{}" exist? [{}]'.format(text, rst)) 525 assert element 526 527 def get_elements_by_type(self, _type): 528 ems = [] 529 for e in self._element_list: 530 if e.get('type') == _type: 531 ems.append(e) 532 return ems 533 534 def get_element_by_type(self, _type, index=0): 535 ems = self.get_elements_by_type(_type) 536 if not ems: 537 return 538 return ems[index] 539 540 def assert_type_exist(self, _type): 541 element = self.get_elements_by_type(_type) 542 rst = 'Yes' if element else 'No' 543 logging.info('check [type]="{}"exist? [{}]'.format(_type, rst)) 544 assert element 545 546 def get_elements_by_key(self, key): 547 ems = [] 548 for e in self._element_list: 549 if e.get('key') == key: 550 ems.append(e) 551 return ems 552 553 def get_element_by_key(self, key, index=0): 554 ems = self.get_elements_by_key(key) 555 if not ems: 556 return 557 return ems[index] 558 559 def assert_key_exist(self, key): 560 element = self.get_elements_by_key(key) 561 rst = 'Yes' if element else 'No' 562 logging.info('check [key]="{}"exist? [{}]'.format(key, rst)) 563 assert element 564 565 def get_elements_by_condition(self, condition: dict): 566 ems = [] 567 for e in self._element_list: 568 cs = set(condition.items()) 569 if cs.issubset(set(e.items())): 570 ems.append(e) 571 return ems 572 573 def get_element_by_condition(self, condition, index=0): 574 ems = self.get_elements_by_condition(condition) 575 if not ems: 576 return 577 return ems[index] 578 579 def set_brightness(self, value=102): 580 return self.hdc_shell(f'power-shell display -s {value}') 581 582 @staticmethod 583 def center_of_element(e): 584 assert e, 'element not exist' 585 bounds = e.get('bounds') 586 x1, y1, x2, y2 = [int(i) for i in re.findall(r'\d+', bounds)] 587 x = (x1 + x2) // 2 588 y = (y1 + y2) // 2 589 return x, y 590 591 def _parse_attribute_nodes(self, json_obj, attr_list=None): 592 if attr_list is None: 593 attr_list = [] 594 595 if isinstance(json_obj, dict): 596 for key, value in json_obj.items(): 597 if key == 'attributes' and isinstance(value, dict): 598 attr_list.append(value) 599 elif isinstance(value, (dict, list)): 600 self._parse_attribute_nodes(value, attr_list) 601 elif isinstance(json_obj, list): 602 for item in json_obj: 603 self._parse_attribute_nodes(item, attr_list) 604 return attr_list 605