• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import os
19import time
20
21from devicetest.core.variables import DeccVariable
22from devicetest.log.logger import DeviceTestLog as log
23from devicetest.utils.file_util import create_dir
24from xdevice import stop_standing_subprocess
25from xdevice import DeviceConnectorType
26
27LOCAL_IP = "127.0.0.1"
28LOCAL_PORT = 6001
29URL = "/"
30FORWARD_PORT = 9501
31
32
33class ScreenAgent:
34    SCREEN_AGENT_MAP = {}
35
36    def __init__(self, device):
37        self._device = device
38        self.log = device.log
39        self.proc = None
40        self.thread = None
41        self.local_port = None
42        self.is_server_started = False
43
44    def __del__(self):
45        self.terminate()
46
47    @classmethod
48    def get_instance(cls, _device):
49        _device.log.debug("in get instance.")
50        instance_sn = _device.device_sn
51        if instance_sn in ScreenAgent.SCREEN_AGENT_MAP:
52            return ScreenAgent.SCREEN_AGENT_MAP[instance_sn]
53
54        agent = ScreenAgent(_device)
55        ScreenAgent.SCREEN_AGENT_MAP[instance_sn] = agent
56        _device.log.debug("out get instance.")
57        return agent
58
59    @classmethod
60    def remove_instance(cls, _device):
61        _sn = _device.device_sn
62        if _sn in ScreenAgent.SCREEN_AGENT_MAP:
63            ScreenAgent.SCREEN_AGENT_MAP[_sn].terminate()
64            del ScreenAgent.SCREEN_AGENT_MAP[_sn]
65
66    @classmethod
67    def get_screenshot_dir(cls):
68        base_path = DeccVariable.cur_case().case_screenshot_dir
69        return os.path.join(base_path, DeccVariable.cur_case().suite_name, DeccVariable.cur_case().name)
70
71    @classmethod
72    def get_take_picture_path(cls, _device, picture_name,
73                              ext=".png", exe_type="takeImage"):
74        """新增参数exeType,默认值为takeImage;可取值takeImage/dumpWindow"""
75        if os.path.isfile(picture_name):
76            folder = os.path.dirname(picture_name)
77            create_dir(folder)
78            return picture_name, os.path.basename(picture_name)
79
80        folder = cls.get_screenshot_dir()
81        create_dir(folder)
82        if picture_name.endswith(ext):
83            picture_name = picture_name.strip(ext)
84
85        if exe_type == "takeImage":
86            save_name = "{}.{}{}{}".format(
87                _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name,
88                DeccVariable.cur_case().image_num, ext)
89        elif exe_type == "videoRecord":
90            save_name = "{}.{}{}{}".format(
91                _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name,
92                DeccVariable.cur_case().video_num, ext)
93        elif exe_type == "stepImage":
94            save_name = "{}.{}{}".format(
95                _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name, ext)
96        else:
97            save_name = "{}.{}{}{}".format(
98                _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name,
99                DeccVariable.cur_case().dump_xml_num, ext)
100
101        fol_path = os.path.join(folder, save_name)
102        if exe_type == "takeImage":
103            DeccVariable.cur_case().image_num += 1
104        elif exe_type == "videoRecord":
105            DeccVariable.cur_case().video_num += 1
106        else:
107            if exe_type != "stepImage":
108                DeccVariable.cur_case().dump_xml_num += 1
109        return fol_path, save_name
110
111    @classmethod
112    def screen_take_picture(cls, args, result, _ta=None, is_raise_exception=True):
113        # When the phone is off, you can set the screenshot off function
114        pass
115
116    @classmethod
117    def _do_capture(cls, _device, link, path, ext=".png"):
118        if hasattr(_device, "is_oh"):
119            remote = "/data/local/tmp/xdevice_screenshot{}".format(ext)
120            new_ext = ".jpeg"
121            link = link[:link.rfind(ext)] + new_ext
122            path = path[:path.rfind(ext)] + new_ext
123            remote = remote[:remote.rfind(ext)] + new_ext
124            _device.execute_shell_command(
125                "snapshot_display -f {}".format(remote), timeout=60000)
126            # 适配非root
127            if hasattr(_device, "is_root") and not getattr(_device, "is_root", False):
128                time.sleep(1)
129            _device.pull_file(remote, path)
130            _device.execute_shell_command("rm -f {}".format(remote))
131        else:
132            remote = "/data/local/tmp/screen.png"
133            _device.connector.shell("screencap -p {}".format(remote), timeout=60000)
134            _device.pull_file(remote, path, timeout=30000)
135            _device.execute_shell_command("rm -f {}".format(remote))
136            try:
137                # 压缩图片为80%
138                cls.compress_image(path)
139            except NameError:
140                pass
141        return path, link
142
143    @classmethod
144    def __screen_and_save_picture(cls, _device, name, ext=".png", exe_type="takeImage"):
145        """
146        @summary: 截取设备屏幕图片并保存
147        @param  name: 保存的图片名称,通过getTakePicturePath方法获取保存全路径
148                ext: 保存图片后缀,支持".png"、".jpg"格式
149        """
150        path, link = cls.get_image_dir_path(_device, name, ext, exe_type=exe_type)
151        # 截图文件后缀在方法内可能发生更改
152        path, link = cls._do_capture(_device, link, path, ext)
153        _device.log.info(
154            '<a href="{}" target="_blank">Screenshot: {}<img style="display: none;" {}/>'
155            '</a>'.format(link, path, cls.resize_image(path)))
156        return path, link
157
158    @classmethod
159    def capture_step_picture(cls, _device, name, ext=".png"):
160        """
161        @summary: 截取step步骤图片并保存
162        @param  name: 保存的图片名称,通过getTakePicturePath方法获取保存全路径
163                ext: 保存图片后缀,支持".png"、".jpg"格式
164        """
165        return None, ""
166
167    @classmethod
168    def compress_image(cls, img_path, ratio=0.5, quality=80):
169        try:
170            import cv2
171            import numpy as np
172            pic = cv2.imdecode(np.fromfile(img_path, dtype=np.uint8), -1)
173            height, width, deep = pic.shape
174            width, height = (width * ratio, height * ratio)
175            pic = cv2.resize(pic, (int(width), int(height)))
176            params = [cv2.IMWRITE_JPEG_QUALITY, quality]
177            cv2.imencode('.jpeg', pic, params=params)[1].tofile(img_path)
178        except (ImportError, NameError):
179            pass
180
181    @classmethod
182    def get_image_dir_path(cls, _device, name, ext=".png", exe_type="takeImage"):
183        """
184        增加了 exeType参数,默认为takeImage;可取值:takeImage/dumpWindow
185        """
186        try:
187            if hasattr(_device, "is_oh"):
188                phone_time = _device.execute_shell_command("date '+%Y%m%d_%H%M%S'").strip()
189            else:
190                phone_time = _device.connector.shell("date '+%Y%m%d_%H%M%S'").strip()
191        except Exception as exception:
192            _device.log.error("get date exception error")
193            _device.log.debug("get date exception: {}".format(exception))
194        else:
195            name = "{}.{}".format(phone_time, name)
196        path, save_name = cls.get_take_picture_path(_device, name, ext, exe_type)
197        link = os.path.join(DeccVariable.cur_case().name, save_name)
198        return path, link
199
200    @classmethod
201    def resize_image(cls, file_path, max_height=480, file_type="image"):
202        width, height = 1080, 1920
203        try:
204            import cv2
205            from PIL import Image
206            if os.path.exists(file_path):
207                if file_type == "image":
208                    img = Image.open(file_path)
209                    width, height = img.width, img.height
210                    img.close()
211                elif file_type == "video":
212                    try:
213                        video_info = cv2.VideoCapture(file_path)
214                        width = int(video_info.get(cv2.CAP_PROP_FRAME_WIDTH))
215                        height = int(video_info.get(cv2.CAP_PROP_FRAME_HEIGHT))
216                        video_info.release()
217                    except Exception as e:
218                        log.warning("get video width and height error: {}, use default".format(e))
219                    if width == 0 or height == 0:
220                        width, height = 1080, 1920
221            if height < max_height:
222                return 'width="%d" height="%d"' % (width, height)
223            ratio = max_height / height
224        except ZeroDivisionError:
225            log.error("shot image height is 0")
226            ratio = 1
227        return 'width="%d" height="%d"' % (width * ratio, max_height)
228
229    def terminate(self):
230        if self.local_port is not None and isinstance(self.local_port, int):
231            if hasattr(self._device, "is_oh") or \
232                    self._device.usb_type == DeviceConnectorType.hdc:
233                self._device.connector_command('fport rm tcp:{}'.format(self.local_port))
234            else:
235                self._device.connector_command('forward --remove tcp:{}'.format(self.local_port))
236        if self.proc is not None:
237            stop_standing_subprocess(self.proc)
238        if self.thread is not None:
239            start = time.time()
240            # 任务结束要等图片生成完
241            while self.thread.isAlive() and time.time() - start < 3:
242                time.sleep(0.1)
243