• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import json
2import logging
3import os.path
4import re
5import subprocess
6import time
7import threading
8
9
10class Device:
11    lock = threading.Lock()
12
13    @classmethod
14    def _execute_cmd(cls, cmd):
15        with cls.lock:
16            logging.info(f'[In]{cmd}')
17            rst = subprocess.run(cmd, capture_output=True, shell=True, timeout=30, encoding='utf-8')
18            out_put = rst.stdout or rst.stderr
19            time.sleep(0.5)
20            # 布局的回显太多了,不打印
21            if not re.search(r'cat /data/local/tmp/\S+json', cmd):
22                logging.info(f'[Out]{out_put}')
23            return out_put
24
25    def __init__(self, sn):
26        self.sn = sn
27        self.report_path = ''
28        self.resource_path = ''
29        self.width = 720
30        self.height = 1280
31        self.velocity_range = (200, 40000)
32        self._element_list = []
33        # self.get_render_size()
34
35    def hdc_shell(self, cmd):
36        out = ''
37        for i in range(3):
38            shell_cmd = f'hdc -l0 -t {self.sn} shell "{cmd}"'
39            out = self._execute_cmd(shell_cmd)
40            if '[Fail]Device not founded or connected' in out:
41                self.restart_hdc_process()
42                time.sleep(5)
43            else:
44                break
45        return out
46
47    def hdc(self, cmd):
48        return self._execute_cmd(f'hdc -l0 -t {self.sn} {cmd}')
49
50    def hdc_version(self):
51        return self._execute_cmd('hdc -v')
52
53    def hdc_list_targets(self):
54        devices = self._execute_cmd('hdc list targets')
55        if 'Empty' in devices:
56            return []
57        return devices.splitlines()
58
59    def install_hap(self, hap_path, replace=True):
60        """
61        暗账应用
62        :param hap_path: hap包的路径
63        :param replace: 是否覆盖安装,true覆盖,否则不覆盖
64        :return:
65        """
66        logging.info('install hap')
67        if replace:
68            cmd = f'app install -r {hap_path}'
69        else:
70            cmd = f'app install {hap_path}'
71        return self.hdc(cmd)
72
73    def install_multi_hap(self, hap_list: list):
74        """
75        安装更新, 多hap可以指定多个文件路径
76        :param hap_list:
77        :return:
78        """
79        logging.info('install hap')
80        haps = ' '.join(hap_list)
81        cmd = 'install {}'.format(haps)
82        return self.hdc(cmd)
83
84    def bm_install(self, hap_list: list):
85        """
86        bm工具安装更新多个hap
87        :param hap_list:
88        :return:
89        """
90        logging.info('install hap')
91        haps = ' '.join(hap_list)
92        cmd = 'bm install -p {}'.format(haps)
93        return self.hdc_shell(cmd)
94
95    def uninstall_hap(self, bundle_name):
96        # 两个命令都可以
97        logging.info(f'uninstall {bundle_name}')
98        # cmd = 'hdc app uninstall {}'.format(bundle_name)
99        return self.hdc(f'uninstall {bundle_name}')
100
101    def bm_uninstall(self, bundle_name):
102        logging.info(f'uninstall {bundle_name}')
103        return self.hdc_shell(f'bm uninstall -n {bundle_name}')
104
105    def hdc_file_send(self, local, remote):
106        logging.info('send file to device')
107        return self.hdc(f'file send "{local}" "{remote}"')
108
109    def hdc_file_recv(self, remote, local=''):
110        logging.info('recv file from device')
111        local = local or self.report_path
112        return self.hdc(f'file recv "{remote}" "{local}"')
113
114    def hilog(self):
115        logging.info('hilog')
116        return self.hdc('hilog')
117
118    def get_udid(self):
119        logging.info('get udid')
120        return self.hdc('bm get --udid')
121
122    def kill_hdc_process(self):
123        logging.info('kill hdc process')
124        return self._execute_cmd('hdc kill')
125
126    def restart_hdc_process(self):
127        # logging.info('restart hdc process')
128        return self._execute_cmd('hdc start -r')
129
130    def reboot(self):
131        # logging.info('reboot device')
132        return self.hdc_shell('reboot')
133
134    def start_ability(self, bundle_name, ability_name):
135        # logging.info(f'start {bundle_name} application')
136        rst = self.hdc_shell(f'aa start -b {bundle_name} -a {ability_name}')
137        time.sleep(2)
138        return rst
139
140    def force_stop(self, bundle_name):
141        # logging.info(f'停掉{bundle_name}应用')
142        return self.hdc_shell(f'aa force-stop {bundle_name}')
143
144    def clean_app_data(self, bundle_name):
145        """
146        清除应用缓存和数据, -c缓存, -d应用数据
147        :param bundle_name:
148        :return:
149        """
150        if not bundle_name:
151            return
152        # logging.info(f'清理{bundle_name}应用数据和缓存')
153        return self.hdc_shell(f'bm clean -n {bundle_name} -c -d')
154
155    def disable_app(self, bundle_name):
156        """
157        禁止应用,应用在桌面消失
158        :param bundle_name:
159        :return:
160        """
161        # logging.info(f'禁止{bundle_name}应用,应用在桌面消失')
162        return self.hdc_shell(f'bm disable -n {bundle_name}')
163
164    def enable_app(self, bundle_name):
165        """
166        允许应用,应用显示在桌面上
167        :param bundle_name:
168        :return:
169        """
170        # logging.info(f'允许{bundle_name}应用,应用显示在桌面上')
171        return self.hdc_shell(f'bm enable -n {bundle_name}')
172
173    def dump_hap_configuration(self, bundle_name):
174        """
175        查看应用配置信息
176        :param bundle_name:
177        :return:
178        """
179        # logging.info(f'查看{bundle_name}应用配置信息')
180        return self.hdc_shell(f'bm dump -n {bundle_name}')
181
182    def stop_permission(self):
183        # self.click(504, 708)
184        # logging.info(f'消掉权限请求的弹窗')
185        focus_win = self.get_focus_window()
186        if 'permissionDialog1' in focus_win:
187            # logging.info(f'消掉权限请求的弹窗')
188            self.refresh_layout()
189            allow = self.get_element_by_condition(condition={'text': '允许', 'type': 'Button'})
190            self.click_element(allow)
191            time.sleep(2)
192        else:
193            logging.info(f'current window: {focus_win}')
194        # return self.force_stop('com.ohos.permissionmanager')
195
196    def click(self, x: int, y: int):
197        """
198        模拟触摸按下
199        :param x:
200        :param y:
201        :return:
202        """
203        # logging.info(f'点击({x},{y})坐标')
204        return self.hdc_shell(f'uinput -M -m {x} {y} -c 0')
205        # return self.hdc_shell(f'uinput -T -c {x} {y}')
206        # return self.hdc_shell(f'uitest uiInput click {x} {y}')
207
208    def click_element(self, e):
209        x, y = self.center_of_element(e)
210        return self.click(x, y)
211
212    def double_click(self, x, y):
213        # logging.info(f'双击({x},{y})坐标')
214        return self.hdc_shell(f'uitest uiInput doubleClick {x} {y}')
215
216    def double_click_element(self, e):
217        x, y = self.center_of_element(e)
218        return self.double_click(x, y)
219
220    def long_click(self, x, y):
221        # logging.info(f'长按({x},{y})坐标')
222        return self.hdc_shell(f'uitest uiInput longClick {x} {y}')
223
224    def long_click_element(self, e):
225        x, y = self.center_of_element(e)
226        return self.long_click(x, y)
227
228    def dirc_fling(self, direct=0):
229        """
230        模拟指定方向滑动
231        :param direct:direction (可选参数,滑动方向,可选值: [0,1,2,3], 滑动方向: [左,右,上,下],默认值: 0)
232        swipeVelocityPps_ (可选参数,滑动速度,取值范围: 200-40000, 默认值: 600, 单位: px/s)
233        stepLength(可选参数,滑动步长,默认值:滑动距离/50, 单位: px)
234        :return:
235        """
236        direct_map = {
237            0: '左',
238            1: '右',
239            2: '上',
240            3: '下',
241        }
242        if direct not in direct_map.keys():
243            direct = 0
244        # logging.info(f'向 {direct_map.get(direct)} 滑动')
245        return self.hdc_shell(f'uitest uiInput dircFling {direct}')
246
247    def swipe(self, from_x, from_y, to_x, to_y, velocity=600):
248        """
249        模拟慢滑操作
250        :param from_x:(必选参数,滑动起点x坐标)
251        :param from_y:(必选参数,滑动起点y坐标)
252        :param to_x:(必选参数,滑动终点x坐标)
253        :param to_y:(必选参数,滑动终点y坐标)
254        :param velocity: (可选参数,滑动速度,取值范围: 200-40000, 默认值: 600, 单位: px/s)
255        :return:
256        """
257        # 保证数据取值范围合理
258        # logging.info(f'从({from_x},{from_y})滑动到({to_x},{to_y}),滑动速度:{velocity}px/s')
259        return self.hdc_shell(f'uitest uiInput swipe {from_x} {from_y} {to_x} {to_y} {velocity}')
260
261    def fling(self, from_x, from_y, to_x, to_y, velocity=600):
262        """
263        模拟快滑操作
264        :param from_x:(必选参数,滑动起点x坐标)
265        :param from_y:(必选参数,滑动起点y坐标)
266        :param to_x:(必选参数,滑动终点x坐标)
267        :param to_y:(必选参数,滑动终点y坐标)
268        :param velocity: (可选参数,滑动速度,取值范围: 200-40000, 默认值: 600, 单位: px/s)
269        :return:
270        """
271        # 保证数据取值范围合理
272        # 保证数据取值范围合理
273        # logging.info(f'从({from_x},{from_y})快速滑动到({to_x},{to_y})')
274        return self.hdc_shell(f'uitest uiInput fling {from_x} {from_y} {to_x} {to_y}')
275
276    def drag(self, from_x, from_y, to_x, to_y, velocity=600):
277        """
278        拖拽,从(x1, y1)拖拽到(x2, y2)
279       :param from_x:(必选参数,滑动起点x坐标)
280        :param from_y:(必选参数,滑动起点y坐标)
281        :param to_x:(必选参数,滑动终点x坐标)
282        :param to_y:(必选参数,滑动终点y坐标)
283        :param velocity: (可选参数,滑动速度,取值范围: 200-40000, 默认值: 600, 单位: px/s)
284        """
285        # logging.info(f'从({from_x},{from_y})拖到({to_x},{to_y}),拖动速度:{velocity}px/s')
286        return self.hdc_shell(f'uitest uiInput drag {from_x} {from_y} {to_x} {to_y} {velocity}')
287
288    def key_event(self, key_code):
289        # logging.info(f'按下{key_code}键')
290        return self.hdc_shell(f'uitest uiInput keyEvent {key_code}')
291
292    def go_home(self):
293        return self.key_event('Home')
294
295    def go_back(self):
296        return self.key_event('Back')
297
298    def press_power_key(self):
299        return self.key_event('Power')
300
301    def press_recent_key(self):
302        return self.click(514, 1243)
303        # return self.key_event('2078')
304
305    def clear_recent_task(self):
306        # logging.info('清理最近的任务')
307        self.press_recent_key()
308        time.sleep(0.5)
309        self.press_clear_btn()
310
311    def press_clear_btn(self):
312        self.click(360, 1170)
313
314    def input_text(self, x, y, text=''):
315        # logging.info(f'向({x, y})坐标处输入“{text}”')
316        return self.hdc_shell(f'uitest uiInput inputText {x} {y} {text}')
317
318    def wakeup(self):
319        # logging.info('点亮屏幕')
320        return self.hdc_shell('power-shell wakeup')
321
322    def suspend(self):
323        # logging.info('熄灭屏幕')
324        return self.hdc_shell('power-shell suspend')
325
326    def set_power_mode(self, mode='602'):
327        """
328        设置电源模式
329        :param mode:600 normal mode 正常模式
330                    601 power save mode省电模式
331                    602 performance mode性能模式,屏幕会常亮
332                    603 extreme power save mode极端省电模式
333        :return:
334        """
335        power_map = {
336            '600': '正常模式',
337            '601': '省电模式',
338            '602': '性能模式',
339            '603': '极端省电模式',
340        }
341        if mode not in power_map.keys():
342            mode = '602'
343        # logging.info(f'设置电源为{power_map.get(mode)}')
344        return self.hdc_shell(f'power-shell setmode {mode}')
345
346    def display_screen_state(self):
347        # logging.info('获取屏幕点亮状态')
348        return self.hdc_shell('hidumper -s 3308')
349
350    def get_render_size(self):
351        # logging.info('获取屏幕分辨率')
352        rst = self.hdc_shell('hidumper -s RenderService -a screen')
353        xy = re.findall(r'render size: (\d+)x(\d+)', rst)[0]
354        self.width, self.height = [int(i) for i in xy]
355
356    def dump_layout(self, file_name=''):
357        # logging.info('获取当前页面布局')
358        if file_name:
359            file_path = '/data/local/tmp/' + file_name
360            dump_rst = self.hdc_shell(f'uitest dumpLayout -p {file_path}')
361        else:
362            dump_rst = self.hdc_shell(f'uitest dumpLayout')
363        layout_file = dump_rst.split(':')[1].strip()
364        return layout_file
365
366    def save_layout_to_local(self, file_name=''):
367        layout_file = self.dump_layout(file_name)
368        self.hdc_file_recv(layout_file)
369
370    def refresh_layout(self, file_name=''):
371        """
372        :param file_name: 文件名不包含路径
373        :return:
374        """
375        file_name = file_name or 'tmp_layout.json'
376        tmp_file = self.dump_layout(file_name)
377        json_data = json.loads(self.hdc_shell(f'cat {tmp_file}'))
378        self._element_list = self._parse_attribute_nodes(json_data)
379        # 将控件按从上到下,从左到右的顺序排列
380        self._element_list.sort(key=lambda e: [self.center_of_element(e)[1], self.center_of_element(e)[0]])
381
382    def snapshot_display(self, jpeg=''):
383        """jpeg必须在/data/local/tmp目录"""
384        # logging.info('获取当前页面截图')
385        if jpeg:
386            jpeg = '/data/local/tmp/' + jpeg
387            shot_rst = self.hdc_shell(f'snapshot_display -f {jpeg}')
388        else:
389            shot_rst = self.hdc_shell('snapshot_display')
390        if 'success' not in shot_rst:
391            return ''
392        return re.findall(r'write to(.*)as jpeg', shot_rst)[0].strip()
393
394    def save_snapshot_to_local(self, file_name=''):
395        tmp_file = self.snapshot_display(file_name)
396        save_file = os.path.join(self.report_path, tmp_file.split('/')[-1])
397        self.hdc_file_recv(tmp_file, self.report_path)
398        return save_file
399
400    def clear_local_tmp(self):
401        self.hdc_shell('rm -rf /data/local/tmp/*')
402
403    def set_screen_timeout(self, timeout=600):
404        """
405        设置屏幕超时时间
406        :param timeout: 单位s
407        :return:
408        """
409        # logging.info(f'设置屏幕{timeout}s无操作后休眠')
410        return self.hdc_shell(f'power-shell timeout -o {timeout * 1000}')
411
412    def rm_faultlog(self):
413        # logging.info('删除fault log')
414        self.hdc_shell('rm -f /data/log/faultlog/SERVICE_BLOCK*')
415        self.hdc_shell('rm -f /data/log/faultlog/appfreeze*')
416        self.hdc_shell('rm -f /data/log/faultlog/temp/cppcrash*')
417        self.hdc_shell('rm -f /data/log/faultlog/faultlogger/jscrash*')
418        self.hdc_shell('rm -f /data/log/faultlog/faultlogger/appfreeze*')
419        self.hdc_shell('rm -f /data/log/faultlog/faultlogger/cppcrash*')
420
421    def start_hilog(self):
422        # logging.info('开启hilog')
423        self.hdc_shell('hilog -w stop;hilog -w clear')
424        self.hdc_shell('hilog -r;hilog -b INFO;hilog -w start -l 10M -n 1000')
425
426    def stop_and_collect_hilog(self, local_dir=''):
427        # logging.info('停止并收集hilog')
428        self.hdc_shell('hilog -w stop')
429        self.hdc_file_recv('/data/log/hilog/', local_dir)
430
431    def unlock(self):
432        """
433        滑动解锁
434        :return:
435        """
436        # logging.info('解锁屏幕')
437        return self.dirc_fling(2)
438
439    def assert_process_running(self, process):
440        # logging.info(f'检查{process}进程是否存在')
441        rst = self.hdc_shell(f'ps -ef | grep -w {process} | grep -v grep')
442        assert process in rst, f'process {process} not exist'
443
444    def get_pid(self, process):
445        # logging.info(f'获取{process}进程PID')
446        return self.hdc_shell(f'pidof {process}').strip()
447
448    def get_wifi_status(self):
449        # hidumper -ls查看所有hidumper服务
450        # logging.info('获取wifi状态')
451        status = self.hdc_shell('hidumper -s WifiDevice')
452        active_state = re.findall(r'WiFi active state: (.*)', status)[0].strip()
453        connection_status = re.findall(r'WiFi connection status: (.*)', status)[0].strip()
454
455        scan_status = self.hdc_shell('hidumper -s WifiScan')
456        is_scan_running = re.findall(r'Is scan service running: (.*)', scan_status)[0].strip()
457        return {
458            'active': active_state,
459            'connected': connection_status,
460            'scanning': is_scan_running
461        }
462
463    def dump_windows_manager_service(self):
464        rst = self.hdc_shell("hidumper -s WindowManagerService -a '-a'")
465        return rst
466
467    def get_focus_window(self):
468        time.sleep(1)
469        text = self.dump_windows_manager_service()
470        focus_win = re.findall(r'Focus window: (\d+)', text)[0].strip()
471        win_id_index = 3
472        focus_window_name = ''
473        for line in text.splitlines():
474            if line.startswith('Focus window'):
475                break
476            wms = re.match(r'\S*\s*(\d+\s+){' + str(win_id_index) + '}', line)
477            if wms:
478                data = wms.group().strip().split()
479                if data[win_id_index] == focus_win:
480                    focus_window_name = data[0]
481                    break
482        # logging.info('当前聚焦的窗口为:{}'.format(focus_window_name))
483        return focus_window_name
484
485    def get_win_id(self, window_name):
486        text = self.dump_windows_manager_service()
487        win = re.search(window_name + r'\s*(\d+\s+){3}', text)
488        if not win:
489            return
490        win_id = win.group().split()[-1]
491        return win_id
492
493    def get_navigationb_winid(self):
494        return self.get_win_id('SystemUi_NavigationB')
495
496    def is_soft_keyboard_on(self):
497        if self.soft_keyboard():
498            return True
499        return False
500
501    def soft_keyboard(self):
502        text = self.dump_windows_manager_service()
503        keyboard = re.search(r'softKeyboard1\s*(\d+\s+){8}\[\s+(\d+\s+){4}]', text)
504        if not keyboard:
505            return ''
506        return keyboard.group()
507
508    def get_elements_by_text(self, text):
509        ems = []
510        for e in self._element_list:
511            if e.get('text') == text:
512                ems.append(e)
513        return ems
514
515    def get_element_by_text(self, text, index=0):
516        ems = self.get_elements_by_text(text)
517        if not ems:
518            return
519        return ems[index]
520
521    def assert_text_exist(self, text):
522        element = self.get_elements_by_text(text)
523        rst = 'Yes' if element else 'No'
524        logging.info('check [text]="{}" exist? [{}]'.format(text, rst))
525        assert element
526
527    def get_elements_by_type(self, _type):
528        ems = []
529        for e in self._element_list:
530            if e.get('type') == _type:
531                ems.append(e)
532        return ems
533
534    def get_element_by_type(self, _type, index=0):
535        ems = self.get_elements_by_type(_type)
536        if not ems:
537            return
538        return ems[index]
539
540    def assert_type_exist(self, _type):
541        element = self.get_elements_by_type(_type)
542        rst = 'Yes' if element else 'No'
543        logging.info('check [type]="{}"exist? [{}]'.format(_type, rst))
544        assert element
545
546    def get_elements_by_key(self, key):
547        ems = []
548        for e in self._element_list:
549            if e.get('key') == key:
550                ems.append(e)
551        return ems
552
553    def get_element_by_key(self, key, index=0):
554        ems = self.get_elements_by_key(key)
555        if not ems:
556            return
557        return ems[index]
558
559    def assert_key_exist(self, key):
560        element = self.get_elements_by_key(key)
561        rst = 'Yes' if element else 'No'
562        logging.info('check [key]="{}"exist? [{}]'.format(key, rst))
563        assert element
564
565    def get_elements_by_condition(self, condition: dict):
566        ems = []
567        for e in self._element_list:
568            cs = set(condition.items())
569            if cs.issubset(set(e.items())):
570                ems.append(e)
571        return ems
572
573    def get_element_by_condition(self, condition, index=0):
574        ems = self.get_elements_by_condition(condition)
575        if not ems:
576            return
577        return ems[index]
578
579    def set_brightness(self, value=102):
580        return self.hdc_shell(f'power-shell display -s {value}')
581
582    @staticmethod
583    def center_of_element(e):
584        assert e, 'element not exist'
585        bounds = e.get('bounds')
586        x1, y1, x2, y2 = [int(i) for i in re.findall(r'\d+', bounds)]
587        x = (x1 + x2) // 2
588        y = (y1 + y2) // 2
589        return x, y
590
591    def _parse_attribute_nodes(self, json_obj, attr_list=None):
592        if attr_list is None:
593            attr_list = []
594
595        if isinstance(json_obj, dict):
596            for key, value in json_obj.items():
597                if key == 'attributes' and isinstance(value, dict):
598                    attr_list.append(value)
599                elif isinstance(value, (dict, list)):
600                    self._parse_attribute_nodes(value, attr_list)
601        elif isinstance(json_obj, list):
602            for item in json_obj:
603                self._parse_attribute_nodes(item, attr_list)
604        return attr_list
605