1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# Copyright (c) 2025 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import sys 17import subprocess 18import argparse 19import time 20import os 21import shutil 22import json 23import platform 24from ctypes import c_char_p 25from ctypes import cdll 26 27IS_DEBUG = False 28HDC_NAME = "hdc" 29SYMBOL_FILES_DIR = '/data/local/tmp/local_libs/' 30BUILD_ID_FILE = "build_id_list" 31EXPECTED_TOOLS = { 32 HDC_NAME: { 33 'is_binutils': False, 34 'test_option': 'version', 35 'path_in_tool': '../platform-tools/hdc', 36 } 37} 38 39 40def get_lib(): 41 script_dir = os.path.dirname(os.path.realpath(__file__)) 42 system_type = platform.system().lower() 43 if system_type == "windows": 44 lib_path = os.path.join(script_dir, "bin", system_type, 45 "x86_64", "libhiperf_report.dll") 46 elif system_type == "linux": 47 lib_path = os.path.join(script_dir, "bin", system_type, 48 "x86_64", "libhiperf_report.so") 49 elif system_type == "darwin": 50 lib_path = os.path.join(script_dir, "bin", system_type, 51 "x86_64", "libhiperf_report.dylib") 52 else: 53 print("Un Support System Platform") 54 raise RuntimeError 55 if not os.path.exists(lib_path): 56 print("{} does not exist!".format(lib_path)) 57 raise RuntimeError 58 return cdll.LoadLibrary(lib_path) 59 60 61def remove(files): 62 if os.path.isfile(files): 63 os.remove(files) 64 elif os.path.isdir(files): 65 shutil.rmtree(files, ignore_errors=True) 66 67 68def is_elf_file(path): 69 if os.path.isfile(path): 70 with open(path, 'rb') as file_bin: 71 data = file_bin.read(4) 72 if len(data) == 4 and bytes_to_str(data) == '\x7fELF': 73 return True 74 return False 75 76 77def get_architecture(elf_path): 78 if is_elf_file(elf_path): 79 my_lib = get_lib() 80 my_lib.ReportGetElfArch.restype = c_char_p 81 ret = my_lib.ReportGetElfArch(elf_path.encode()) 82 return ret.decode() 83 return 'unknown' 84 85 86def get_build_id(elf_path): 87 if is_elf_file(elf_path): 88 my_lib = get_lib() 89 my_lib.ReportGetBuildId.restype = c_char_p 90 ret = my_lib.ReportGetBuildId(elf_path.encode()) 91 return ret.decode() 92 return '' 93 94 95def get_hiperf_binary_path(arch, binary_name): 96 if arch == 'aarch64': 97 arch = 'arm64' 98 script_dir = os.path.dirname(os.path.realpath(__file__)) 99 arch_dir = os.path.join(script_dir, "bin", "ohos", arch) 100 if not os.path.isdir(arch_dir): 101 raise Exception("can't find arch directory: %s" % arch_dir) 102 binary_path = os.path.join(arch_dir, binary_name) 103 if not os.path.isfile(binary_path): 104 raise Exception("can't find binary: %s" % binary_path) 105 return binary_path 106 107 108def str_to_bytes(str_value): 109 if not (sys.version_info >= (3, 0)): 110 return str_value 111 return str_value.encode('utf-8') 112 113 114def bytes_to_str(bytes_value): 115 if not bytes_value: 116 return '' 117 if not (sys.version_info >= (3, 0)): 118 return bytes_value 119 return bytes_value.decode('utf-8') 120 121 122def executable_file_available(executable, option='--help'): 123 try: 124 print([executable, option]) 125 subproc = subprocess.Popen([executable, option], 126 stdout=subprocess.PIPE, 127 stderr=subprocess.PIPE) 128 subproc.communicate(timeout=5) 129 return subproc.returncode == 0 130 except OSError: 131 return False 132 133 134def dir_check(arg): 135 path = os.path.realpath(arg) 136 if not os.path.isdir(path): 137 raise argparse.ArgumentTypeError('{} is not a directory.'.format(path)) 138 return path 139 140 141def file_check(arg): 142 path = os.path.realpath(arg) 143 if not os.path.isfile(path): 144 raise argparse.ArgumentTypeError('{} is not a file.'.format(path)) 145 return path 146 147 148def get_arg_list(arg_list): 149 res = [] 150 if arg_list: 151 for arg in arg_list: 152 res += arg 153 return res 154 155 156def get_arch(arch): 157 if 'aarch64' in arch: 158 return 'arm64' 159 if 'arm' in arch: 160 return 'arm' 161 if 'x86_64' in arch or "amd64" in arch: 162 return 'x86_64' 163 if '86' in arch: 164 return 'x86' 165 raise Exception('unsupported architecture: %s' % arch.strip()) 166 167 168def find_tool_path(tool, tool_path=None): 169 if tool not in EXPECTED_TOOLS: 170 return None 171 tool_info = EXPECTED_TOOLS[tool] 172 test_option = tool_info.get('test_option', '--help') 173 path_in_tool = tool_info['path_in_tool'] 174 path_in_tool = path_in_tool.replace('/', os.sep) 175 176 # 1. Find tool in the given tool path. 177 if tool_path: 178 path = os.path.join(tool_path, path_in_tool) 179 if executable_file_available(path, test_option): 180 return path 181 182 # 2. Find tool in the tool directory containing hiperf scripts. 183 path = os.path.join('../..', path_in_tool) 184 if executable_file_available(path, test_option): 185 return path 186 187 # 3. Find tool in $PATH. 188 if executable_file_available(tool, test_option): 189 return tool 190 191 return None 192 193 194def get_used_binaries(perf_data, report_file, local_lib_dir): 195 if local_lib_dir: 196 get_lib().ReportUnwindJson(perf_data.encode("utf-8"), 197 'json.txt'.encode("utf-8"), 198 local_lib_dir.encode("utf-8")) 199 else: 200 get_lib().ReportJson(perf_data.encode("utf-8"), 201 'json.txt'.encode("utf-8")) 202 time.sleep(2) 203 with open('json.txt', 'r') as json_file: 204 all_json = json_file.read() 205 with open('./testModule/report.html', 'r', encoding='utf-8') as html_file: 206 html_str = html_file.read() 207 with open(report_file, 'w', encoding='utf-8', mode='0o666') as report_html_file: 208 report_html_file.write(html_str + all_json + '</script>' 209 ' </body>' 210 ' </html>') 211 dirname, _ = os.path.split(os.path.abspath(sys.argv[0])) 212 abs_path = os.path.join(dirname, report_file) 213 print("save to %s success" % abs_path) 214 215 216def validate_json(json_data): 217 try: 218 json.loads(json_data) 219 return True 220 except ValueError: 221 return False 222 223 224def get_shell_result(cmd, words, ret): 225 print(f"\nexecuting command: {cmd}") 226 output = subprocess.check_output(cmd.split()).decode() 227 print(f"\noutput: {output}") 228 if ret: 229 assert words in output 230 else: 231 assert words not in output 232 233 234def get_path_by_attribute(tree, key, value): 235 attributes = tree['attributes'] 236 if attributes is None: 237 print('tree contains no attributes') 238 return None 239 path = [] 240 if attributes.get(key) == value: 241 return path 242 for index, child in enumerate(tree['children']): 243 child_path = path + [index] 244 result = get_path_by_attribute(child, key, value) 245 if result is not None: 246 return child_path + result 247 return None 248 249 250def get_element_by_path(tree, path): 251 if len(path) == 1: 252 return tree['children'][path[0]] 253 return get_element_by_path(tree['children'][path[0]], path[1:]) 254 255 256def get_location_by_text(tree, text): 257 path = get_path_by_attribute(tree, 'text', text) 258 if path is None or len(path) == 0: 259 print('text not found in layout file') 260 element = get_element_by_path(tree, path) 261 locations = element['attributes']['bounds'].replace('[', ' ').replace(']', ' ').replace(',', ' ').strip().split() 262 return (int(locations[0] + locations[2]) / 1000) + 20, (int(locations[1] + locations[3]) / 10000) + 10 263 264 265def touch(dx, dy): 266 print('---------------------------') 267 print(dx) 268 print(dy) 269 print('---------------------------') 270 output = subprocess.check_output(f"hdc shell uitest uiInput click {dx} {dy}") 271 272 273def get_layout_tree(): 274 output = subprocess.check_output("hdc shell uitest dumpLayout", text=True) 275 path = output.split(':')[-1] 276 print(path) 277 cmd = 'hdc file recv ' + path 278 print(cmd) 279 subprocess.call('hdc file recv ' + path) 280 subprocess.call('hdc shell rm ' + path) 281 path = path[path.find('layout'):] 282 print(path[:len(path) - 1]) 283 with open(path[:len(path) - 1], encoding='utf-8') as f: 284 tree = json.load(f) 285 return tree 286 287 288def touch_button_by_text(text): 289 layout_tree = get_layout_tree() 290 location = get_location_by_text(layout_tree, text) 291 touch(location[0], location[1]) 292 293 294class HdcInterface: 295 def __init__(self, root_authority=True): 296 hdc_path = find_tool_path(HDC_NAME) 297 if not hdc_path: 298 raise Exception("Can't find hdc in PATH environment.") 299 self.hdc_path = hdc_path 300 self.root_authority = root_authority 301 302 def run_hdc_cmd(self, hdc_args, log_output=True): 303 hdc_args = [self.hdc_path] + hdc_args 304 if IS_DEBUG: 305 print('run hdc cmd: %s' % hdc_args) 306 subproc = subprocess.Popen(hdc_args, stdout=subprocess.PIPE) 307 (out, _) = subproc.communicate() 308 out = bytes_to_str(out) 309 return_code = subproc.returncode 310 result = (return_code == 0) 311 if out and hdc_args[1] != 'file send' and \ 312 hdc_args[1] != 'file recv': 313 if log_output: 314 print(out) 315 if IS_DEBUG: 316 print('run hdc cmd: %s [result %s]' % (hdc_args, result)) 317 return result, out 318 319 def check_run(self, hdc_args): 320 result, out = self.run_hdc_cmd(hdc_args) 321 if not result: 322 raise Exception('run "hdc %s" failed' % hdc_args) 323 return out 324 325 def switch_root(self): 326 if not self.root_authority: 327 self._not_use_root() 328 return False 329 result, out = self.run_hdc_cmd(['shell', 'whoami']) 330 if not result: 331 return False 332 if 'root' in out: 333 return True 334 build_type = self.get_attribute('ro.build.type') 335 if build_type == 'user': 336 return False 337 self.run_hdc_cmd(['root']) 338 time.sleep(1) 339 self.run_hdc_cmd(['wait-for-device']) 340 result, out = self.run_hdc_cmd(['shell', 'whoami']) 341 return result and 'root' in out 342 343 def get_attribute(self, name): 344 result, out = self.run_hdc_cmd(['shell', 'getprop', 345 name]) 346 if result: 347 return out 348 349 def get_device_architecture(self): 350 output = self.check_run(['shell', 'uname', '-m']) 351 return get_arch(output) 352 353 def _not_use_root(self): 354 result, out = self.run_hdc_cmd(['shell', 'whoami']) 355 if not result or 'root' not in out: 356 return 357 print('unroot hdc') 358 self.run_hdc_cmd(['unroot']) 359 time.sleep(1) 360 self.run_hdc_cmd(['wait-for-device']) 361 362 363class ElfStruct: 364 def __init__(self, path, lib_name): 365 self.path = path 366 self.name = lib_name 367 368 369class LocalLibDownload: 370 371 def __init__(self, device_arch, hdc): 372 self.hdc = hdc 373 self.device_arch = device_arch 374 375 self.build_id_map_of_host = {} 376 self.build_id_map_of_device = {} 377 self.host_lib_count_map = {} 378 self.request_architectures = self.__get_need_architectures(device_arch) 379 380 @classmethod 381 def __get_need_architectures(self, device_arch): 382 if device_arch == 'x86_64': 383 return ['x86', 'x86_64'] 384 if device_arch == 'x86': 385 return ['x86'] 386 if device_arch == 'arm64': 387 return ['arm', 'arm64'] 388 if device_arch == 'arm': 389 return ['arm'] 390 return [] 391 392 def get_host_local_libs(self, local_lib_dir): 393 self.build_id_map_of_host.clear() 394 for root, dirs, files in os.walk(local_lib_dir): 395 for name in files: 396 if name.endswith('.so'): 397 self.__append_host_local_lib(os.path.join(root, name), name) 398 399 def get_device_local_libs(self): 400 self.build_id_map_of_device.clear() 401 if os.path.exists(BUILD_ID_FILE): 402 remove(BUILD_ID_FILE) 403 self.hdc.check_run(['shell', 'mkdir', '-p', SYMBOL_FILES_DIR]) 404 self.hdc.run_hdc_cmd(['file recv', SYMBOL_FILES_DIR + BUILD_ID_FILE]) 405 if os.path.isfile(BUILD_ID_FILE): 406 with open(BUILD_ID_FILE, 'rb') as file_bin: 407 for line in file_bin.readlines(): 408 line = bytes_to_str(line).strip() 409 items = line.split('=') 410 if len(items) == 2: 411 self.build_id_map_of_device[items[0]] = items[1] 412 remove(BUILD_ID_FILE) 413 414 def update_device_local_libs(self): 415 # Send local libs to device. 416 for build_id in self.build_id_map_of_host: 417 if build_id not in self.build_id_map_of_device: 418 elf_struct = self.build_id_map_of_host[build_id] 419 self.hdc.check_run(['file send', elf_struct.path, 420 SYMBOL_FILES_DIR + elf_struct.name]) 421 422 # Remove Device lib while local libs not exist on host. 423 for build_id in self.build_id_map_of_device: 424 if build_id not in self.build_id_map_of_host: 425 name = self.build_id_map_of_device[build_id] 426 self.hdc.run_hdc_cmd(['shell', 'rm', SYMBOL_FILES_DIR + name]) 427 428 # Send new build_id_list to device. 429 with open(BUILD_ID_FILE, 'wb') as file_bin: 430 for build_id in self.build_id_map_of_host: 431 str_bytes = str_to_bytes('%s=%s\n' % (build_id, 432 self.build_id_map_of_host[ 433 build_id].name)) 434 file_bin.write(str_bytes) 435 436 self.hdc.check_run(['file send', BUILD_ID_FILE, 437 SYMBOL_FILES_DIR + BUILD_ID_FILE]) 438 remove(BUILD_ID_FILE) 439 440 def __append_host_local_lib(self, path, name): 441 build_id = get_build_id(path) 442 if not build_id: 443 return 444 arch = get_architecture(path) 445 if arch not in self.request_architectures: 446 return 447 448 elf_struct = self.build_id_map_of_host.get(build_id) 449 if not elf_struct: 450 count = self.host_lib_count_map.get(name, 0) 451 self.host_lib_count_map[name] = count + 1 452 if count == 0: 453 unique_name = name 454 else: 455 unique_name = name + '_' + count 456 self.build_id_map_of_host[build_id] = ElfStruct(path, 457 unique_name) 458 else: 459 elf_struct.path = path 460 461 462class PerformanceProfile: 463 """Class of all Profilers.""" 464 465 def __init__(self, args, control_module=""): 466 self.args = args 467 self.hdc = HdcInterface(root_authority=not args.not_hdc_root) 468 self.device_root = self.hdc.switch_root() 469 self.device_arch = self.hdc.get_device_architecture() 470 self.record_subproc = None 471 self.is_control = bool(control_module) 472 self.control_mode = control_module 473 474 def profile(self): 475 if not self.is_control or self.control_mode == "prepare": 476 print('prepare profiling') 477 self.download() 478 print('start profiling') 479 if not self.combine_args(): 480 return 481 else: 482 self.exec_control() 483 self.profiling() 484 485 if not self.is_control: 486 print('pull profiling data') 487 self.get_profiling_data() 488 if self.control_mode == "stop": 489 self.wait_data_generate_done() 490 print('profiling is finished.') 491 492 def download(self): 493 """Prepare recording. """ 494 if self.args.local_lib_dir: 495 self.download_libs() 496 497 def download_libs(self): 498 executor = LocalLibDownload(self.device_arch, self.hdc) 499 executor.get_host_local_libs(self.args.local_lib_dir) 500 executor.get_device_local_libs() 501 executor.update_device_local_libs() 502 503 def combine_args(self): 504 if self.args.package_name: 505 if self.args.ability: 506 self.kill_process() 507 self.start_profiling(['--app', self.args.package_name]) 508 if self.args.ability: 509 ability = self.args.package_name + '/' + self.args.ability 510 start_cmd = ['shell', 'aa', 'start', '-a', ability] 511 result = self.hdc.run_hdc_cmd(start_cmd) 512 if not result: 513 self.record_subproc.terminate() 514 print("Can't start ability %s" % ability) 515 return False 516 # else: no need to start an ability. 517 elif self.args.local_program: 518 pid = self.hdc.check_run(['shell', 'pidof', 519 self.args.local_program]) 520 if not pid: 521 print("Can't find pid of %s" % self.args.local_program) 522 return False 523 pid = int(pid) 524 self.start_profiling(['-p', str(pid)]) 525 elif self.args.cmd: 526 cmds = self.args.cmd.split(' ') 527 cmd = [cmd.replace("'", "") for cmd in cmds] 528 self.start_profiling(cmd) 529 elif self.args.pid: 530 self.start_profiling(['-p', ','.join(self.args.pid)]) 531 elif self.args.tid: 532 self.start_profiling(['-t', ','.join(self.args.tid)]) 533 elif self.args.system_wide: 534 self.start_profiling(['-a']) 535 return True 536 537 def kill_process(self): 538 if self.get_app_process(): 539 self.hdc.check_run(['shell', 'aa', 'force-stop', self.args.app]) 540 count = 0 541 while True: 542 time.sleep(1) 543 pid = self.get_app_process() 544 if not pid: 545 break 546 count += 1 547 # 3 seconds exec kill 548 if count >= 3: 549 self.run_in_app_dir(['kill', '-9', str(pid)]) 550 551 def get_app_process(self): 552 result, output = self.hdc.run_hdc_cmd( 553 ['shell', 'pidof', self.args.package_name]) 554 print(output) 555 print(result) 556 return int(output) if result else None 557 558 def run_in_app_dir(self, args): 559 if self.device_root: 560 hdc_args = ['shell', 'cd /data/data/' + self.args.package_name + 561 ' && ' + (' '.join(args))] 562 else: 563 hdc_args = ['shell', 'run-as', self.args.package_name] + args 564 return self.hdc.run_hdc_cmd(hdc_args, log_output=False) 565 566 def start_profiling(self, selected_args): 567 """Start hiperf reocrd process on device.""" 568 self.hdc.run_hdc_cmd(['shell', 'rm', 569 '/data/local/tmp/perf.data']) 570 record_options = self.args.record_options.split(' ') 571 record_options = [cmd.replace("'", "") for cmd in record_options] 572 if self.is_control: 573 args = ['hiperf', 'record', 574 '--control', self.control_mode, '-o', 575 '/data/local/tmp/perf.data'] + record_options 576 else: 577 args = ['hiperf', 'record', '-o', 578 '/data/local/tmp/perf.data'] + record_options 579 if self.args.local_lib_dir and self.hdc.run_hdc_cmd( 580 ['shell', 'ls', SYMBOL_FILES_DIR]): 581 args += ['--symbol-dir', SYMBOL_FILES_DIR] 582 args += selected_args 583 hdc_args = [self.hdc.hdc_path, 'shell'] + args 584 print('run hdc cmd: %s' % hdc_args) 585 self.record_subproc = subprocess.Popen(hdc_args) 586 587 def profiling(self): 588 """ 589 Wait until profiling finishes, or stop profiling when user 590 presses Ctrl-C. 591 """ 592 593 try: 594 return_code = self.record_subproc.wait() 595 except KeyboardInterrupt: 596 self.end_profiling() 597 self.record_subproc = None 598 return_code = 0 599 print('profiling result [%s]' % (return_code == 0)) 600 if return_code != 0: 601 raise Exception('Failed to record profiling data.') 602 603 def end_profiling(self): 604 """ 605 Stop profiling by sending SIGINT to hiperf, and wait until it exits 606 to make sure perf.data is completely generated. 607 """ 608 has_killed = False 609 while True: 610 (result, out) = self.hdc.run_hdc_cmd( 611 ['shell', 'pidof', 'hiperf']) 612 if not out: 613 break 614 if not has_killed: 615 has_killed = True 616 self.hdc.run_hdc_cmd(['shell', 'pkill', 617 '-l', '2', 'hiperf']) 618 time.sleep(1) 619 620 def get_profiling_data(self): 621 current_path = os.getcwd() 622 full_path = os.path.join(current_path, self.args.output_perf_data) 623 self.hdc.check_run(['file recv', '/data/local/tmp/perf.data', 624 full_path]) 625 self.hdc.run_hdc_cmd(['shell', 'rm', 626 '/data/local/tmp/perf.data']) 627 628 def exec_control(self): 629 hdc_args = [self.hdc.hdc_path, 'shell', 630 'hiperf', 'record', 631 '--control', self.control_mode] 632 print('run hdc cmd: %s' % hdc_args) 633 self.record_subproc = subprocess.Popen(hdc_args) 634 635 def wait_data_generate_done(self): 636 last_size = 0 637 while True: 638 (result, out) = self.hdc.run_hdc_cmd( 639 ['shell', 'du', 'data/local/tmp/perf.data']) 640 if "du" not in out: 641 current_size = out.split(" ")[0] 642 if current_size == last_size: 643 self.get_profiling_data() 644 break 645 else: 646 last_size = current_size 647 else: 648 print("not generate perf.data") 649 break 650 651 time.sleep(1) 652