1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import time 21import platform 22import subprocess 23 24############################################################################## 25############################################################################## 26 27__all__ = ["DeviceAdapter", "HDCDeviceAdapter"] 28 29if platform.system() != 'Windows': 30 QUOTATION_MARKS = "'" 31else: 32 QUOTATION_MARKS = "\"" 33USB_TOOLS = "hdc" 34HDC_TOOLS = "hdc" 35 36 37############################################################################## 38############################################################################## 39 40 41def get_package_name(hap_filepath): 42 package_name = "" 43 44 if os.path.exists(hap_filepath): 45 filename = os.path.basename(hap_filepath) 46 47 # unzip the hap file 48 hap_bak_path = os.path.abspath(os.path.join( 49 os.path.dirname(hap_filepath), 50 "%s.bak" % filename)) 51 zf_desc = zipfile.ZipFile(hap_filepath) 52 try: 53 zf_desc.extractall(path=hap_bak_path) 54 except RuntimeError as error: 55 print(error) 56 zf_desc.close() 57 58 # verify config.json file 59 app_profile_path = os.path.join(hap_bak_path, "config.json") 60 if os.path.isfile(app_profile_path): 61 load_dict = {} 62 with open(app_profile_path, 'r') as json_file: 63 load_dict = json.load(json_file) 64 profile_list = load_dict.values() 65 for profile in profile_list: 66 package_name = profile.get("package") 67 if not package_name: 68 continue 69 break 70 71 # delete hap_bak_path 72 if os.path.exists(hap_bak_path): 73 shutil.rmtree(hap_bak_path) 74 else: 75 print("file %s not exists" % hap_filepath) 76 77 return package_name 78 79 80############################################################################## 81############################################################################## 82 83 84class DeviceAdapter: 85 def __init__(self, remote_ip="", repote_port="", device_sn="", name=""): 86 self.device_sn = device_sn 87 self.name = name 88 self.test_path = "/%s/%s" % ("data", "test") 89 self.device_para = self.get_device_para( 90 remote_ip, 91 repote_port, 92 device_sn) 93 self.init_device() 94 95 ############################################################### 96 ############################################################### 97 98 def init_device(self): 99 self.remount() 100 self.shell('rm -rf %s' % self.test_path) 101 self.shell('mkdir -p %s' % self.test_path) 102 self.shell('chmod 777 %s' % self.test_path) 103 self.shell("mount -o rw,remount,rw /%s" % "system") 104 105 def remount(self): 106 command = "%s %s remount" % (USB_TOOLS, self.device_para) 107 self.execute_command(command) 108 109 def push_file(self, srcpath, despath): 110 command = "%s %s push %s %s" % ( 111 USB_TOOLS, 112 self.device_para, 113 srcpath, 114 despath) 115 return self.execute_command(command) 116 117 def pull_file(self, srcpath, despath): 118 command = "%s %s pull %s %s" % ( 119 USB_TOOLS, 120 self.device_para, 121 srcpath, 122 despath) 123 return self.execute_command(command) 124 125 def unlock_screen(self): 126 self.shell("svc power stayon true") 127 128 def unlock_device(self): 129 self.shell("input keyevent 82") 130 self.shell("wm dismiss-keyguard") 131 132 def lock_screen(self): 133 self.shell("svc power stayon false") 134 135 def disable_keyguard(self): 136 self.unlock_screen() 137 self.unlock_device() 138 139 def install_hap(self, suite_file): 140 file_name = os.path.basename(suite_file) 141 message = self.shell_with_output("bm install -p %s" % os.path.join( 142 self.test_path, file_name)) 143 message = str(message).rstrip() 144 if message != "": 145 print(message) 146 if message == "" or "Success" in message: 147 return_code = True 148 else: 149 return_code = False 150 time.sleep(1) 151 return return_code 152 153 def uninstall_hap(self, suite_file): 154 package_name = get_package_name(suite_file) 155 result = self.shell("pm uninstall %s" % package_name) 156 time.sleep(1) 157 return result 158 159 def install_app(self, file_path): 160 command = "%s %s install %s" % ( 161 USB_TOOLS, 162 self.device_para, 163 file_path) 164 message = self.execute_command(command) 165 message = str(message).rstrip() 166 if message != "": 167 print(message) 168 if message == "" or "Success" in message: 169 return_code = True 170 else: 171 return_code = False 172 time.sleep(1) 173 return return_code 174 175 def uninstall_app(self, package_name): 176 command = "pm uninstall %s" % (package_name) 177 return_code = self.shell_with_output(command) 178 if return_code: 179 time.sleep(1) 180 return return_code 181 182 183 ############################################################### 184 ############################################################### 185 186 @classmethod 187 def get_device_para(cls, remote_ip="", remote_port="", 188 device_sn=""): 189 if "" == remote_ip or "" == remote_port: 190 if "" == device_sn: 191 device_para = "" 192 else: 193 device_para = "-s %s" % device_sn 194 else: 195 if "" == device_sn: 196 device_para = "-H %s -P %s" % (remote_ip, remote_port) 197 else: 198 device_para = "-H %s -P %s -s %s" % ( 199 remote_ip, remote_port, device_sn) 200 return device_para 201 202 def execute_command(self, command, print_flag=True, timeout=900): 203 try: 204 if print_flag: 205 print("command: " + command) 206 if subprocess.call(command, shell=True, timeout=timeout) == 0: 207 print("results: successed") 208 return True 209 except Exception as error: 210 print("Exception: %s" % str(error)) 211 print("results: failed") 212 return False 213 214 def execute_command_with_output(self, command, print_flag=True): 215 if print_flag: 216 print("command: " + command) 217 218 proc = subprocess.Popen(command, 219 stdout=subprocess.PIPE, 220 stderr=subprocess.PIPE, 221 shell=True) 222 223 try: 224 data, _ = proc.communicate() 225 if isinstance(data, bytes): 226 data = data.decode('utf-8', 'ignore') 227 finally: 228 proc.stdout.close() 229 proc.stderr.close() 230 return data 231 232 def shell(self, command=""): 233 return self.execute_command("%s %s shell %s%s%s" % ( 234 USB_TOOLS, 235 self.device_para, 236 QUOTATION_MARKS, 237 command, 238 QUOTATION_MARKS)) 239 240 def execute_shell_command(self, command): 241 return self.shell(command) 242 243 def shell_with_output(self, command=""): 244 return self.execute_command_with_output("%s %s shell %s%s%s" % ( 245 USB_TOOLS, 246 self.device_para, 247 QUOTATION_MARKS, 248 command, 249 QUOTATION_MARKS)) 250 251 def check_path_legal(self, path): 252 if path and " " in path: 253 return "\"%s\"" % path 254 return path 255 256 def is_file_exist(self, file_path): 257 file_path = self.check_path_legal(file_path) 258 message = self.shell_with_output("ls %s" % file_path) 259 return False if message == "" else True 260 261 262############################################################################## 263############################################################################## 264 265 266class HDCDeviceAdapter: 267 def __init__(self, remote_ip="", repote_port="", device_sn="", name=""): 268 self.device_sn = device_sn 269 self.name = name 270 self.test_path = "/%s/%s/" % ("data", "test") 271 self.device_para = self.get_device_para( 272 remote_ip, 273 repote_port, 274 device_sn) 275 self.init_device() 276 277 ############################################################### 278 ############################################################### 279 280 def init_device(self): 281 self.remount() 282 self.shell('rm -rf %s' % self.test_path) 283 self.shell('mkdir -p %s' % self.test_path) 284 self.shell('chmod 777 %s' % self.test_path) 285 self.shell("mount -o rw,remount,rw /%s" % "system") 286 287 def remount(self): 288 command = "%s %s target mount" % (HDC_TOOLS) 289 self.execute_command(command) 290 291 def push_file(self, srcpath, despath): 292 command = "%s %s file send %s %s" % ( 293 HDC_TOOLS, 294 self.device_para, 295 srcpath, 296 despath) 297 return self.execute_command(command) 298 299 def pull_file(self, srcpath, despath): 300 command = "%s %s file recv %s %s" % ( 301 HDC_TOOLS, 302 self.device_para, 303 srcpath, 304 despath) 305 return self.execute_command(command) 306 307 def unlock_screen(self): 308 self.shell("svc power stayon true") 309 310 def unlock_device(self): 311 self.shell("input keyevent 82") 312 self.shell("wm dismiss-keyguard") 313 314 def lock_screen(self): 315 self.shell("svc power stayon false") 316 317 def disable_keyguard(self): 318 self.unlock_screen() 319 self.unlock_device() 320 321 322 ############################################################### 323 ############################################################### 324 325 @classmethod 326 def get_device_para(cls, remote_ip="", remote_port="", 327 device_sn=""): 328 if "" == remote_ip or "" == remote_port: 329 if "" == device_sn: 330 device_para = "" 331 else: 332 device_para = "-t %s" % device_sn 333 else: 334 if "" == device_sn: 335 device_para = "-s tcp:%s:%s" % (remote_ip, remote_port) 336 else: 337 device_para = "-s tcp:%s:%s -t %s" % ( 338 remote_ip, remote_port, device_sn) 339 return device_para 340 341 def execute_command(self, command, print_flag=True, timeout=900): 342 try: 343 if print_flag: 344 print("command: " + command) 345 if subprocess.call(command, shell=True, timeout=timeout) == 0: 346 print("results: successed") 347 return True 348 except Exception as error: 349 print("Exception: %s" % str(error)) 350 print("results: failed") 351 return False 352 353 def execute_command_with_output(self, command, print_flag=True): 354 if print_flag: 355 print("command: " + command) 356 357 proc = subprocess.Popen(command, 358 stdout=subprocess.PIPE, 359 stderr=subprocess.PIPE, 360 shell=True) 361 362 try: 363 data, _ = proc.communicate() 364 if isinstance(data, bytes): 365 data = data.decode('utf-8', 'ignore') 366 finally: 367 proc.stdout.close() 368 proc.stderr.close() 369 return data 370 371 def shell(self, command=""): 372 return self.execute_command("%s %s shell %s%s%s" % ( 373 HDC_TOOLS, 374 self.device_para, 375 QUOTATION_MARKS, 376 command, 377 QUOTATION_MARKS)) 378 379 def shell_with_output(self, command=""): 380 return self.execute_command_with_output("%s %s shell %s%s%s" % ( 381 HDC_TOOLS, 382 self.device_para, 383 QUOTATION_MARKS, 384 command, 385 QUOTATION_MARKS)) 386 387 def check_path_legal(self, path): 388 if path and " " in path: 389 return "\"%s\"" % path 390 return path 391 392 def is_file_exist(self, file_path): 393 file_path = self.check_path_legal(file_path) 394 message = self.shell_with_output("ls %s" % file_path) 395 return False if message == "" else True 396 397 398############################################################################## 399############################################################################## 400 401