1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18import json 19import os 20import shutil 21import platform 22import subprocess 23 24############################################################################## 25############################################################################## 26 27__all__ = ["DeviceShell"] 28 29import zipfile 30 31if platform.system() != 'Windows': 32 QUOTATION_MARKS = "'" 33else: 34 QUOTATION_MARKS = "\"" 35 36HDC_TOOLS = "hdc" 37 38 39############################################################################## 40############################################################################## 41 42 43def get_package_name(hap_filepath): 44 package_name = "" 45 46 if os.path.exists(hap_filepath): 47 filename = os.path.basename(hap_filepath) 48 49 # unzip the hap file 50 hap_bak_path = os.path.abspath(os.path.join( 51 os.path.dirname(hap_filepath), 52 "%s.bak" % filename)) 53 zf_desc = zipfile.ZipFile(hap_filepath) 54 try: 55 zf_desc.extractall(path=hap_bak_path) 56 except RuntimeError as error: 57 print("Unzip error: ", hap_bak_path) 58 zf_desc.close() 59 60 # verify config.json file 61 app_profile_path = os.path.join(hap_bak_path, "config.json") 62 if os.path.isfile(app_profile_path): 63 load_dict = {} 64 with open(app_profile_path, 'r') as json_file: 65 load_dict = json.load(json_file) 66 profile_list = load_dict.values() 67 for profile in profile_list: 68 package_name = profile.get("package") 69 if not package_name: 70 continue 71 break 72 73 # delete hap_bak_path 74 if os.path.exists(hap_bak_path): 75 shutil.rmtree(hap_bak_path) 76 else: 77 print("file %s not exists" % hap_filepath) 78 79 return package_name 80 81############################################################################## 82############################################################################## 83 84 85class DeviceShell: 86 def __init__(self, conn_type, remote_ip="", device_sn="", repote_port="", name=""): 87 self.conn_type = conn_type 88 self.device_sn = device_sn 89 self.name = name 90 self.test_path = "data/test" 91 if conn_type: 92 self.device_params = self.get_device_hdc_para( 93 device_sn 94 ) 95 else: 96 self.device_params = self.get_device_para( 97 remote_ip, repote_port, device_sn) 98 self.init_device() 99 100 @classmethod 101 def get_device_para(cls, remote_ip="", remote_port="", 102 device_sn=""): 103 if "" == remote_ip or "" == remote_port: 104 if "" == device_sn: 105 device_para = "" 106 else: 107 device_para = "-s %s" % device_sn 108 else: 109 if "" == device_sn: 110 device_para = "-H %s -P %s" % (remote_ip, remote_port) 111 else: 112 device_para = "-H %s -P %s -t %s" % ( 113 remote_ip, remote_port, device_sn) 114 return device_para 115 116 @classmethod 117 def get_device_hdc_para(cls, device_sn=""): 118 if " " == device_sn: 119 device_para = "" 120 else: 121 device_para = "-t %s" % device_sn 122 123 return device_para 124 125 def remount(self): 126 if self.conn_type: 127 remount = "target mount" 128 else: 129 remount = "remount" 130 command = "%s %s %s" % (HDC_TOOLS, self.device_params, remount) 131 self.execute_command(command) 132 133 def init_device(self): 134 self.remount() 135 self.shell('rm -rf %s' % self.test_path) 136 self.shell('mkdir -p %s' % self.test_path) 137 self.shell('chmod 777 -R %s' % self.test_path) 138 self.shell("mount -o rw,remount,rw /%s" % "system") 139 140 def unlock_screen(self): 141 self.shell("svc power stayon true") 142 143 def unlock_device(self): 144 self.shell("input keyevent 82") 145 self.shell("wm dismiss-keyguard") 146 147 def push_file(self, srcpath, despath): 148 if self.conn_type: 149 push_args = "file send" 150 else: 151 push_args = "push" 152 command = "%s %s %s %s %s" % ( 153 HDC_TOOLS, 154 self.device_params, 155 push_args, 156 srcpath, 157 despath) 158 return self.execute_command(command) 159 160 def pull_file(self, srcpath, despath): 161 if self.conn_type: 162 pull_args = "file recv" 163 else: 164 pull_args = "pull" 165 command = "%s %s %s %s %s" % ( 166 HDC_TOOLS, 167 self.device_params, 168 pull_args, 169 srcpath, 170 despath) 171 return self.execute_command(command) 172 173 def lock_screen(self): 174 self.shell("svc power stayon false") 175 176 def disable_keyguard(self): 177 self.unlock_screen() 178 self.unlock_device() 179 180 def shell(self, command=""): 181 return self.execute_command("%s %s shell %s%s%s" % ( 182 HDC_TOOLS, 183 self.device_params, 184 QUOTATION_MARKS, 185 command, 186 QUOTATION_MARKS)) 187 188 @classmethod 189 def execute_command(cls, command, print_flag=True, timeout=900): 190 try: 191 if print_flag: 192 print("command: " + command) 193 if subprocess.call(command, shell=True, timeout=timeout) == 0: 194 print("results: successed") 195 return True 196 except Exception as error: 197 print("Exception: %s" % str(error)) 198 print("results: failed") 199 return False 200 201 def shell_with_output(self, command=""): 202 return self.execute_command_with_output("%s %s shell %s%s%s" % ( 203 HDC_TOOLS, 204 self.device_params, 205 QUOTATION_MARKS, 206 command, 207 QUOTATION_MARKS)) 208 209 @classmethod 210 def execute_command_with_output(cls, command, print_flag=True): 211 if print_flag: 212 print("command: " + command) 213 214 proc = subprocess.Popen(command, 215 stdout=subprocess.PIPE, 216 stderr=subprocess.PIPE, 217 shell=True) 218 219 result = "" 220 try: 221 data, _ = proc.communicate() 222 if isinstance(data, bytes): 223 result = data.decode('utf-8', 'ignore') 224 finally: 225 proc.stdout.close() 226 proc.stderr.close() 227 return result if result else data 228 229 @classmethod 230 def check_path_legal(cls, path): 231 if path and " " in path: 232 return "\"%s\"" % path 233 return path