• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import json
2import logging
3import os.path
4import re
5import subprocess
6import time
7import threading
8
9
10class Device:
11    lock = threading.Lock()
12
13    @classmethod
14    def _execute_cmd(cls, cmd):
15        with cls.lock:
16            logging.info(f'[In]{cmd}')
17            rst = subprocess.run(cmd, capture_output=True, shell=True, timeout=30, encoding='utf-8')
18            out_put = rst.stdout or rst.stderr
19            time.sleep(0.5)
20            # 布局的回显太多了,不打印
21            if not re.search(r'cat /data/local/tmp/\S+json', cmd):
22                logging.info(f'[Out]{out_put}')
23            return out_put
24
25    def __init__(self, sn):
26        self.sn = sn
27        self.report_path = ''
28        self.resource_path = ''
29        self.width = 720
30        self.height = 1280
31        self.velocity_range = (200, 40000)
32        self._element_list = []
33        # self.get_render_size()
34
35    def hdc_shell(self, cmd):
36        out = ''
37        for i in range(3):
38            shell_cmd = f'hdc -l0 -t {self.sn} shell "{cmd}"'
39            out = self._execute_cmd(shell_cmd)
40            if '[Fail]Device not founded or connected' in out:
41                self.restart_hdc_process()
42                time.sleep(5)
43            else:
44                break
45        return out
46
47    def hdc(self, cmd):
48        return self._execute_cmd(f'hdc -l0 -t {self.sn} {cmd}')
49
50    def hdc_version(self):
51        return self._execute_cmd('hdc -v')
52
53    def hdc_list_targets(self):
54        devices = self._execute_cmd('hdc list targets')
55        if 'Empty' in devices:
56            return []
57        return devices.splitlines()
58
59    def install_hap(self, hap_path, replace=True):
60        """
61        暗账应用
62        :param hap_path: hap包的路径
63        :param replace: 是否覆盖安装,true覆盖,否则不覆盖
64        :return:
65        """
66        logging.info('install hap')
67        if replace:
68            cmd = f'app install -r {hap_path}'
69        else:
70            cmd = f'app install {hap_path}'
71        return self.hdc(cmd)
72
73    def install_multi_hap(self, hap_list: list):
74        """
75        安装更新, 多hap可以指定多个文件路径
76        :param hap_list:
77        :return:
78        """
79        logging.info('install hap')
80        haps = ' '.join(hap_list)
81        cmd = 'install {}'.format(haps)
82        return self.hdc(cmd)
83
84    def bm_install(self, hap_list: list):
85        """
86        bm工具安装更新多个hap
87        :param hap_list:
88        :return:
89        """
90        logging.info('install hap')
91        haps = ' '.join(hap_list)
92        cmd = 'bm install -p {}'.format(haps)
93        return self.hdc_shell(cmd)
94
95    def uninstall_hap(self, bundle_name):
96        # 两个命令都可以
97        logging.info(f'uninstall {bundle_name}')
98        # cmd = 'hdc app uninstall {}'.format(bundle_name)
99        return self.hdc(f'uninstall {bundle_name}')
100
101    def bm_uninstall(self, bundle_name):
102        logging.info(f'uninstall {bundle_name}')
103        return self.hdc_shell(f'bm uninstall -n {bundle_name}')
104
105    def hdc_file_send(self, local, remote):
106        logging.info('send file to device')
107        return self.hdc(f'file send "{local}" "{remote}"')
108
109    def hdc_file_recv(self, remote, local=''):
110        logging.info('recv file from device')
111        local = local or self.report_path
112        return self.hdc(f'file recv "{remote}" "{local}"')
113
114    def hilog(self):
115        logging.info('hilog')
116        return self.hdc('hilog')
117
118    def get_udid(self):
119        logging.info('get udid')
120        return self.hdc('bm get --udid')
121
122    def kill_hdc_process(self):
123        logging.info('kill hdc process')
124        return self._execute_cmd('hdc kill')
125
126    def restart_hdc_process(self):
127        # logging.info('restart hdc process')
128        return self._execute_cmd('hdc start -r')
129
130    def reboot(self):
131        # logging.info('reboot device')
132        return self.hdc_shell('reboot')
133
134    def start_ability(self, bundle_name, ability_name):
135        # logging.info(f'start {bundle_name} application')
136        rst = self.hdc_shell(f'aa start -b {bundle_name} -a {ability_name}')
137        time.sleep(2)
138        return rst
139
140    def force_stop(self, bundle_name):
141        # logging.info(f'停掉{bundle_name}应用')
142        return self.hdc_shell(f'aa force-stop {bundle_name}')
143
144    def clean_app_data(self, bundle_name):
145        """
146        清除应用缓存和数据, -c缓存, -d应用数据
147        :param bundle_name:
148        :return:
149        """
150        if not bundle_name:
151            return
152        # logging.info(f'清理{bundle_name}应用数据和缓存')
153        return self.hdc_shell(f'bm clean -n {bundle_name} -c -d')
154
155    def disable_app(self, bundle_name):
156        """
157        禁止应用,应用在桌面消失
158        :param bundle_name:
159        :return:
160        """
161        # logging.info(f'禁止{bundle_name}应用,应用在桌面消失')
162        return self.hdc_shell(f'bm disable -n {bundle_name}')
163
164    def enable_app(self, bundle_name):
165        """
166        允许应用,应用显示在桌面上
167        :param bundle_name:
168        :return:
169        """
170        # logging.info(f'允许{bundle_name}应用,应用显示在桌面上')
171        return self.hdc_shell(f'bm enable -n {bundle_name}')
172
173    def dump_hap_configuration(self, bundle_name):
174        """
175        查看应用配置信息
176        :param bundle_name:
177        :return:
178        """
179        # logging.info(f'查看{bundle_name}应用配置信息')
180        return self.hdc_shell(f'bm dump -n {bundle_name}')
181
182    def stop_permission(self):
183        # self.click(504, 708)
184        # logging.info(f'消掉权限请求的弹窗')
185        focus_win = self.get_focus_window()
186        if 'permissionDialog1' in focus_win:
187            # logging.info(f'消掉权限请求的弹窗')
188            self.refresh_layout()
189            allow = self.get_element_by_condition(condition={'text': '允许', 'type': 'Button'})
190            self.click_element(allow)
191            time.sleep(2)
192        else:
193            logging.info(f'current window: {focus_win}')
194        # return self.force_stop('com.ohos.permissionmanager')
195
196    def click(self, x: int, y: int):
197        """
198        模拟触摸按下
199        :param x:
200        :param y:
201        :return:
202        """
203        # logging.info(f'点击({x},{y})坐标')
204        return self.hdc_shell(f'uinput -M -m {x} {y} -c 0')
205        # return self.hdc_shell(f'uinput -T -c {x} {y}')
206        # return self.hdc_shell(f'uitest uiInput click {x} {y}')
207
208    def click_element(self, e):
209        x, y = self.center_of_element(e)
210        return self.click(x, y)
211
212    def double_click(self, x, y):
213        # logging.info(f'双击({x},{y})坐标')
214        return self.hdc_shell(f'uitest uiInput doubleClick {x} {y}')
215
216    def double_click_element(self, e):
217        x, y = self.center_of_element(e)
218        return self.double_click(x, y)
219
220    def long_click(self, x, y):
221        # logging.info(f'长按({x},{y})坐标')
222        return self.hdc_shell(f'uitest uiInput longClick {x} {y}')
223
224    def long_click_element(self, e):
225        x, y = self.center_of_element(e)
226        return self.long_click(x, y)
227
228    def dirc_fling(self, direct=0):
229        """
230        模拟指定方向滑动
231        :param direct:direction (可选参数,滑动方向,可选值: [0,1,2,3], 滑动方向: [左,右,上,下],默认值: 0)
232        swipeVelocityPps_ (可选参数,滑动速度,取值范围: 200-40000, 默认值: 600, 单位: px/s)
233        stepLength(可选参数,滑动步长,默认值:滑动距离/50, 单位: px)
234        :return:
235        """
236        direct_map = {
237            0: '左',
238            1: '右',
239            2: '上',
240            3: '下',
241        }
242        if direct not in direct_map.keys():
243            direct = 0
244        # logging.info(f'向 {direct_map.get(direct)} 滑动')
245        return self.hdc_shell(f'uitest uiInput dircFling {direct}')
246
247    def swipe(self, from_x, from_y, to_x, to_y, velocity=600):
248        """
249        模拟慢滑操作
250        :param from_x:(必选参数,滑动起点x坐标)
251        :param from_y:(必选参数,滑动起点y坐标)
252        :param to_x:(必选参数,滑动终点x坐标)
253        :param to_y:(必选参数,滑动终点y坐标)
254        :param velocity: (可选参数,滑动速度,取值范围: 200-40000, 默认值: 600, 单位: px/s)
255        :return:
256        """
257        # 保证数据取值范围合理
258        # logging.info(f'从({from_x},{from_y})滑动到({to_x},{to_y}),滑动速度:{velocity}px/s')
259        return self.hdc_shell(f'uitest uiInput swipe {from_x} {from_y} {to_x} {to_y} {velocity}')
260
261    def fling(self, from_x, from_y, to_x, to_y, velocity=600):
262        """
263        模拟快滑操作
264        :param from_x:(必选参数,滑动起点x坐标)
265        :param from_y:(必选参数,滑动起点y坐标)
266        :param to_x:(必选参数,滑动终点x坐标)
267        :param to_y:(必选参数,滑动终点y坐标)
268        :param velocity: (可选参数,滑动速度,取值范围: 200-40000, 默认值: 600, 单位: px/s)
269        :return:
270        """
271        # 保证数据取值范围合理
272        # 保证数据取值范围合理
273        # logging.info(f'从({from_x},{from_y})快速滑动到({to_x},{to_y})')
274        return self.hdc_shell(f'uitest uiInput fling {from_x} {from_y} {to_x} {to_y}')
275
276    def drag(self, from_x, from_y, to_x, to_y, velocity=600):
277        """
278        拖拽,从(x1, y1)拖拽到(x2, y2)
279       :param from_x:(必选参数,滑动起点x坐标)
280        :param from_y:(必选参数,滑动起点y坐标)
281        :param to_x:(必选参数,滑动终点x坐标)
282        :param to_y:(必选参数,滑动终点y坐标)
283        :param velocity: (可选参数,滑动速度,取值范围: 200-40000, 默认值: 600, 单位: px/s)
284        """
285        # logging.info(f'从({from_x},{from_y})拖到({to_x},{to_y}),拖动速度:{velocity}px/s')
286        return self.hdc_shell(f'uitest uiInput drag {from_x} {from_y} {to_x} {to_y} {velocity}')
287
288    def key_event(self, key_code):
289        # logging.info(f'按下{key_code}键')
290        return self.hdc_shell(f'uitest uiInput keyEvent {key_code}')
291
292    def go_home(self):
293        return self.key_event('Home')
294
295    def go_back(self):
296        return self.key_event('Back')
297
298    def press_power_key(self):
299        return self.key_event('Power')
300
301    def press_recent_key(self):
302        return self.key_event('2078')
303
304    def clear_recent_task(self):
305        # logging.info('清理最近的任务')
306        self.press_recent_key()
307        time.sleep(0.5)
308        self.press_clear_btn()
309
310    def press_clear_btn(self):
311        self.click(360, 1170)
312
313    def input_text(self, x, y, text=''):
314        # logging.info(f'向({x, y})坐标处输入“{text}”')
315        return self.hdc_shell(f'uitest uiInput inputText {x} {y} {text}')
316
317    def wakeup(self):
318        # logging.info('点亮屏幕')
319        return self.hdc_shell('power-shell wakeup')
320
321    def suspend(self):
322        # logging.info('熄灭屏幕')
323        return self.hdc_shell('power-shell suspend')
324
325    def set_power_mode(self, mode='602'):
326        """
327        设置电源模式
328        :param mode:600 normal mode 正常模式
329                    601 power save mode省电模式
330                    602 performance mode性能模式,屏幕会常亮
331                    603 extreme power save mode极端省电模式
332        :return:
333        """
334        power_map = {
335            '600': '正常模式',
336            '601': '省电模式',
337            '602': '性能模式',
338            '603': '极端省电模式',
339        }
340        if mode not in power_map.keys():
341            mode = '602'
342        # logging.info(f'设置电源为{power_map.get(mode)}')
343        return self.hdc_shell(f'power-shell setmode {mode}')
344
345    def display_screen_state(self):
346        # logging.info('获取屏幕点亮状态')
347        return self.hdc_shell('hidumper -s 3308')
348
349    def get_render_size(self):
350        # logging.info('获取屏幕分辨率')
351        rst = self.hdc_shell('hidumper -s RenderService -a screen')
352        xy = re.findall(r'render size: (\d+)x(\d+)', rst)[0]
353        self.width, self.height = [int(i) for i in xy]
354
355    def dump_layout(self, file_name=''):
356        # logging.info('获取当前页面布局')
357        if file_name:
358            file_path = '/data/local/tmp/' + file_name
359            dump_rst = self.hdc_shell(f'uitest dumpLayout -p {file_path}')
360        else:
361            dump_rst = self.hdc_shell(f'uitest dumpLayout')
362        layout_file = dump_rst.split(':')[1].strip()
363        return layout_file
364
365    def save_layout_to_local(self, file_name=''):
366        layout_file = self.dump_layout(file_name)
367        self.hdc_file_recv(layout_file)
368
369    def refresh_layout(self, file_name=''):
370        """
371        :param file_name: 文件名不包含路径
372        :return:
373        """
374        file_name = file_name or 'tmp_layout.json'
375        tmp_file = self.dump_layout(file_name)
376        json_data = json.loads(self.hdc_shell(f'cat {tmp_file}'))
377        self._element_list = self._parse_attribute_nodes(json_data)
378        # 将控件按从上到下,从左到右的顺序排列
379        self._element_list.sort(key=lambda e: [self.center_of_element(e)[1], self.center_of_element(e)[0]])
380
381    def snapshot_display(self, jpeg=''):
382        """jpeg必须在/data/local/tmp目录"""
383        # logging.info('获取当前页面截图')
384        if jpeg:
385            jpeg = '/data/local/tmp/' + jpeg
386            shot_rst = self.hdc_shell(f'snapshot_display -f {jpeg}')
387        else:
388            shot_rst = self.hdc_shell('snapshot_display')
389        if 'success' not in shot_rst:
390            return ''
391        return re.findall(r'write to(.*)as jpeg', shot_rst)[0].strip()
392
393    def save_snapshot_to_local(self, file_name=''):
394        tmp_file = self.snapshot_display(file_name)
395        save_file = os.path.join(self.report_path, tmp_file.split('/')[-1])
396        self.hdc_file_recv(tmp_file, self.report_path)
397        return save_file
398
399    def clear_local_tmp(self):
400        self.hdc_shell('rm -rf /data/local/tmp/*')
401
402    def set_screen_timeout(self, timeout=600):
403        """
404        设置屏幕超时时间
405        :param timeout: 单位s
406        :return:
407        """
408        # logging.info(f'设置屏幕{timeout}s无操作后休眠')
409        return self.hdc_shell(f'power-shell timeout -o {timeout * 1000}')
410
411    def rm_faultlog(self):
412        # logging.info('删除fault log')
413        self.hdc_shell('rm -f /data/log/faultlog/SERVICE_BLOCK*')
414        self.hdc_shell('rm -f /data/log/faultlog/appfreeze*')
415        self.hdc_shell('rm -f /data/log/faultlog/temp/cppcrash*')
416        self.hdc_shell('rm -f /data/log/faultlog/faultlogger/jscrash*')
417        self.hdc_shell('rm -f /data/log/faultlog/faultlogger/appfreeze*')
418        self.hdc_shell('rm -f /data/log/faultlog/faultlogger/cppcrash*')
419
420    def start_hilog(self):
421        # logging.info('开启hilog')
422        self.hdc_shell('hilog -w stop;hilog -w clear')
423        self.hdc_shell('hilog -r;hilog -b INFO;hilog -w start -l 10M -n 1000')
424
425    def stop_and_collect_hilog(self, local_dir=''):
426        # logging.info('停止并收集hilog')
427        self.hdc_shell('hilog -w stop')
428        self.hdc_file_recv('/data/log/hilog/', local_dir)
429
430    def unlock(self):
431        """
432        滑动解锁
433        :return:
434        """
435        # logging.info('解锁屏幕')
436        return self.dirc_fling(2)
437
438    def assert_process_running(self, process):
439        # logging.info(f'检查{process}进程是否存在')
440        rst = self.hdc_shell(f'ps -ef | grep -w {process} | grep -v grep')
441        assert process in rst, f'process {process} not exist'
442
443    def get_pid(self, process):
444        # logging.info(f'获取{process}进程PID')
445        return self.hdc_shell(f'pidof {process}').strip()
446
447    def get_wifi_status(self):
448        # hidumper -ls查看所有hidumper服务
449        # logging.info('获取wifi状态')
450        status = self.hdc_shell('hidumper -s WifiDevice')
451        active_state = re.findall(r'WiFi active state: (.*)', status)[0].strip()
452        connection_status = re.findall(r'WiFi connection status: (.*)', status)[0].strip()
453
454        scan_status = self.hdc_shell('hidumper -s WifiScan')
455        is_scan_running = re.findall(r'Is scan service running: (.*)', scan_status)[0].strip()
456        return {
457            'active': active_state,
458            'connected': connection_status,
459            'scanning': is_scan_running
460        }
461
462    def dump_windows_manager_service(self):
463        rst = self.hdc_shell("hidumper -s WindowManagerService -a '-a'")
464        return rst
465
466    def get_focus_window(self):
467        time.sleep(1)
468        text = self.dump_windows_manager_service()
469        focus_win = re.findall(r'Focus window: (\d+)', text)[0].strip()
470        win_id_index = 3
471        focus_window_name = ''
472        for line in text.splitlines():
473            if line.startswith('Focus window'):
474                break
475            wms = re.match(r'\S*\s*(\d+\s+){' + str(win_id_index) + '}', line)
476            if wms:
477                data = wms.group().strip().split()
478                if data[win_id_index] == focus_win:
479                    focus_window_name = data[0]
480                    break
481        # logging.info('当前聚焦的窗口为:{}'.format(focus_window_name))
482        return focus_window_name
483
484    def get_win_id(self, window_name):
485        text = self.dump_windows_manager_service()
486        win = re.search(window_name + r'\s*(\d+\s+){3}', text)
487        if not win:
488            return
489        win_id = win.group().split()[-1]
490        return win_id
491
492    def get_navigationb_winid(self):
493        return self.get_win_id('SystemUi_NavigationB')
494
495    def is_soft_keyboard_on(self):
496        if self.soft_keyboard():
497            return True
498        return False
499
500    def soft_keyboard(self):
501        text = self.dump_windows_manager_service()
502        keyboard = re.search(r'softKeyboard1\s*(\d+\s+){8}\[\s+(\d+\s+){4}]', text)
503        if not keyboard:
504            return ''
505        return keyboard.group()
506
507    def get_elements_by_text(self, text):
508        ems = []
509        for e in self._element_list:
510            if e.get('text') == text:
511                ems.append(e)
512        return ems
513
514    def get_element_by_text(self, text, index=0):
515        ems = self.get_elements_by_text(text)
516        if not ems:
517            return
518        return ems[index]
519
520    def assert_text_exist(self, text):
521        element = self.get_elements_by_text(text)
522        rst = 'Yes' if element else 'No'
523        logging.info('check [text]="{}" exist? [{}]'.format(text, rst))
524        assert element
525
526    def get_elements_by_type(self, _type):
527        ems = []
528        for e in self._element_list:
529            if e.get('type') == _type:
530                ems.append(e)
531        return ems
532
533    def get_element_by_type(self, _type, index=0):
534        ems = self.get_elements_by_type(_type)
535        if not ems:
536            return
537        return ems[index]
538
539    def assert_type_exist(self, _type):
540        element = self.get_elements_by_type(_type)
541        rst = 'Yes' if element else 'No'
542        logging.info('check [type]="{}"exist? [{}]'.format(_type, rst))
543        assert element
544
545    def get_elements_by_key(self, key):
546        ems = []
547        for e in self._element_list:
548            if e.get('key') == key:
549                ems.append(e)
550        return ems
551
552    def get_element_by_key(self, key, index=0):
553        ems = self.get_elements_by_key(key)
554        if not ems:
555            return
556        return ems[index]
557
558    def assert_key_exist(self, key):
559        element = self.get_elements_by_key(key)
560        rst = 'Yes' if element else 'No'
561        logging.info('check [key]="{}"exist? [{}]'.format(key, rst))
562        assert element
563
564    def get_elements_by_condition(self, condition: dict):
565        ems = []
566        for e in self._element_list:
567            cs = set(condition.items())
568            if cs.issubset(set(e.items())):
569                ems.append(e)
570        return ems
571
572    def get_element_by_condition(self, condition, index=0):
573        ems = self.get_elements_by_condition(condition)
574        if not ems:
575            return
576        return ems[index]
577
578    def set_brightness(self, value=102):
579        return self.hdc_shell(f'power-shell display -s {value}')
580
581    @staticmethod
582    def center_of_element(e):
583        assert e, 'element not exist'
584        bounds = e.get('bounds')
585        x1, y1, x2, y2 = [int(i) for i in re.findall(r'\d+', bounds)]
586        x = (x1 + x2) // 2
587        y = (y1 + y2) // 2
588        return x, y
589
590    def _parse_attribute_nodes(self, json_obj, attr_list=None):
591        if attr_list is None:
592            attr_list = []
593
594        if isinstance(json_obj, dict):
595            for key, value in json_obj.items():
596                if key == 'attributes' and isinstance(value, dict):
597                    attr_list.append(value)
598                elif isinstance(value, (dict, list)):
599                    self._parse_attribute_nodes(value, attr_list)
600        elif isinstance(json_obj, list):
601            for item in json_obj:
602                self._parse_attribute_nodes(item, attr_list)
603        return attr_list
604