1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18import os 19import time 20 21from devicetest.core.variables import DeccVariable 22from devicetest.log.logger import DeviceTestLog as log 23from devicetest.utils.file_util import create_dir 24from xdevice import stop_standing_subprocess 25from xdevice import DeviceConnectorType 26 27LOCAL_IP = "127.0.0.1" 28LOCAL_PORT = 6001 29URL = "/" 30FORWARD_PORT = 9501 31 32 33class ScreenAgent: 34 SCREEN_AGENT_MAP = {} 35 36 def __init__(self, device): 37 self._device = device 38 self.log = device.log 39 self.proc = None 40 self.thread = None 41 self.local_port = None 42 self.is_server_started = False 43 44 def __del__(self): 45 self.terminate() 46 47 @classmethod 48 def get_instance(cls, _device): 49 _device.log.debug("in get instance.") 50 instance_sn = _device.device_sn 51 if instance_sn in ScreenAgent.SCREEN_AGENT_MAP: 52 return ScreenAgent.SCREEN_AGENT_MAP[instance_sn] 53 54 agent = ScreenAgent(_device) 55 ScreenAgent.SCREEN_AGENT_MAP[instance_sn] = agent 56 _device.log.debug("out get instance.") 57 return agent 58 59 @classmethod 60 def remove_instance(cls, _device): 61 _sn = _device.device_sn 62 if _sn in ScreenAgent.SCREEN_AGENT_MAP: 63 ScreenAgent.SCREEN_AGENT_MAP[_sn].terminate() 64 del ScreenAgent.SCREEN_AGENT_MAP[_sn] 65 66 @classmethod 67 def get_screenshot_dir(cls): 68 base_path = DeccVariable.cur_case().case_screenshot_dir 69 return os.path.join(base_path, DeccVariable.cur_case().suite_name, DeccVariable.cur_case().name) 70 71 @classmethod 72 def get_take_picture_path(cls, _device, picture_name, 73 ext=".png", exe_type="takeImage"): 74 """新增参数exeType,默认值为takeImage;可取值takeImage/dumpWindow""" 75 if os.path.isfile(picture_name): 76 folder = os.path.dirname(picture_name) 77 create_dir(folder) 78 return picture_name, os.path.basename(picture_name) 79 80 folder = cls.get_screenshot_dir() 81 create_dir(folder) 82 if picture_name.endswith(ext): 83 picture_name = picture_name.strip(ext) 84 85 if exe_type == "takeImage": 86 save_name = "{}.{}{}{}".format( 87 _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name, 88 DeccVariable.cur_case().image_num, ext) 89 elif exe_type == "videoRecord": 90 save_name = "{}.{}{}{}".format( 91 _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name, 92 DeccVariable.cur_case().video_num, ext) 93 elif exe_type == "stepImage": 94 save_name = "{}.{}{}".format( 95 _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name, ext) 96 else: 97 save_name = "{}.{}{}{}".format( 98 _device.device_sn.replace("?", "sn").replace(":", "_"), picture_name, 99 DeccVariable.cur_case().dump_xml_num, ext) 100 101 fol_path = os.path.join(folder, save_name) 102 if exe_type == "takeImage": 103 DeccVariable.cur_case().image_num += 1 104 elif exe_type == "videoRecord": 105 DeccVariable.cur_case().video_num += 1 106 else: 107 if exe_type != "stepImage": 108 DeccVariable.cur_case().dump_xml_num += 1 109 return fol_path, save_name 110 111 @classmethod 112 def screen_take_picture(cls, args, result, _ta=None, is_raise_exception=True): 113 # When the phone is off, you can set the screenshot off function 114 pass 115 116 @classmethod 117 def _do_capture(cls, _device, link, path, ext=".png"): 118 if hasattr(_device, "is_oh"): 119 remote = "/data/local/tmp/xdevice_screenshot{}".format(ext) 120 new_ext = ".jpeg" 121 link = link[:link.rfind(ext)] + new_ext 122 path = path[:path.rfind(ext)] + new_ext 123 remote = remote[:remote.rfind(ext)] + new_ext 124 _device.execute_shell_command( 125 "snapshot_display -f {}".format(remote), timeout=60000) 126 # 适配非root 127 if hasattr(_device, "is_root") and not getattr(_device, "is_root", False): 128 time.sleep(1) 129 _device.pull_file(remote, path) 130 _device.execute_shell_command("rm -f {}".format(remote)) 131 else: 132 remote = "/data/local/tmp/screen.png" 133 _device.connector.shell("screencap -p {}".format(remote), timeout=60000) 134 _device.pull_file(remote, path, timeout=30000) 135 _device.execute_shell_command("rm -f {}".format(remote)) 136 try: 137 # 压缩图片为80% 138 cls.compress_image(path) 139 except NameError: 140 pass 141 return path, link 142 143 @classmethod 144 def __screen_and_save_picture(cls, _device, name, ext=".png", exe_type="takeImage"): 145 """ 146 @summary: 截取设备屏幕图片并保存 147 @param name: 保存的图片名称,通过getTakePicturePath方法获取保存全路径 148 ext: 保存图片后缀,支持".png"、".jpg"格式 149 """ 150 path, link = cls.get_image_dir_path(_device, name, ext, exe_type=exe_type) 151 # 截图文件后缀在方法内可能发生更改 152 path, link = cls._do_capture(_device, link, path, ext) 153 _device.log.info( 154 '<a href="{}" target="_blank">Screenshot: {}<img style="display: none;" {}/>' 155 '</a>'.format(link, path, cls.resize_image(path))) 156 return path, link 157 158 @classmethod 159 def capture_step_picture(cls, _device, name, ext=".png"): 160 """ 161 @summary: 截取step步骤图片并保存 162 @param name: 保存的图片名称,通过getTakePicturePath方法获取保存全路径 163 ext: 保存图片后缀,支持".png"、".jpg"格式 164 """ 165 return None, "" 166 167 @classmethod 168 def compress_image(cls, img_path, ratio=0.5, quality=80): 169 try: 170 import cv2 171 import numpy as np 172 pic = cv2.imdecode(np.fromfile(img_path, dtype=np.uint8), -1) 173 height, width, deep = pic.shape 174 width, height = (width * ratio, height * ratio) 175 pic = cv2.resize(pic, (int(width), int(height))) 176 params = [cv2.IMWRITE_JPEG_QUALITY, quality] 177 cv2.imencode('.jpeg', pic, params=params)[1].tofile(img_path) 178 except (ImportError, NameError): 179 pass 180 181 @classmethod 182 def get_image_dir_path(cls, _device, name, ext=".png", exe_type="takeImage"): 183 """ 184 增加了 exeType参数,默认为takeImage;可取值:takeImage/dumpWindow 185 """ 186 try: 187 if hasattr(_device, "is_oh"): 188 phone_time = _device.execute_shell_command("date '+%Y%m%d_%H%M%S'").strip() 189 else: 190 phone_time = _device.connector.shell("date '+%Y%m%d_%H%M%S'").strip() 191 except Exception as exception: 192 _device.log.error("get date exception error") 193 _device.log.debug("get date exception: {}".format(exception)) 194 else: 195 name = "{}.{}".format(phone_time, name) 196 path, save_name = cls.get_take_picture_path(_device, name, ext, exe_type) 197 link = os.path.join(DeccVariable.cur_case().name, save_name) 198 return path, link 199 200 @classmethod 201 def resize_image(cls, file_path, max_height=480, file_type="image"): 202 width, height = 1080, 1920 203 try: 204 import cv2 205 from PIL import Image 206 if os.path.exists(file_path): 207 if file_type == "image": 208 img = Image.open(file_path) 209 width, height = img.width, img.height 210 img.close() 211 elif file_type == "video": 212 try: 213 video_info = cv2.VideoCapture(file_path) 214 width = int(video_info.get(cv2.CAP_PROP_FRAME_WIDTH)) 215 height = int(video_info.get(cv2.CAP_PROP_FRAME_HEIGHT)) 216 video_info.release() 217 except Exception as e: 218 log.warning("get video width and height error: {}, use default".format(e)) 219 if width == 0 or height == 0: 220 width, height = 1080, 1920 221 if height < max_height: 222 return 'width="%d" height="%d"' % (width, height) 223 ratio = max_height / height 224 except ZeroDivisionError: 225 log.error("shot image height is 0") 226 ratio = 1 227 return 'width="%d" height="%d"' % (width * ratio, max_height) 228 229 def terminate(self): 230 if self.local_port is not None and isinstance(self.local_port, int): 231 if hasattr(self._device, "is_oh") or \ 232 self._device.usb_type == DeviceConnectorType.hdc: 233 self._device.connector_command('fport rm tcp:{}'.format(self.local_port)) 234 else: 235 self._device.connector_command('forward --remove tcp:{}'.format(self.local_port)) 236 if self.proc is not None: 237 stop_standing_subprocess(self.proc) 238 if self.thread is not None: 239 start = time.time() 240 # 任务结束要等图片生成完 241 while self.thread.isAlive() and time.time() - start < 3: 242 time.sleep(0.1) 243