• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#   Copyright (c) 2025 Huawei Device Co., Ltd.
4#   Licensed under the Apache License, Version 2.0 (the "License");
5#   you may not use this file except in compliance with the License.
6#   You may obtain a copy of the License at
7#
8#       http://www.apache.org/licenses/LICENSE-2.0
9#
10#   Unless required by applicable law or agreed to in writing, software
11#   distributed under the License is distributed on an "AS IS" BASIS,
12#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13#   See the License for the specific language governing permissions and
14#   limitations under the License.
15
16import sys
17import subprocess
18import argparse
19import time
20import os
21import shutil
22import json
23import platform
24from ctypes import c_char_p
25from ctypes import cdll
26
27IS_DEBUG = False
28HDC_NAME = "hdc"
29SYMBOL_FILES_DIR = '/data/local/tmp/local_libs/'
30BUILD_ID_FILE = "build_id_list"
31EXPECTED_TOOLS = {
32    HDC_NAME: {
33        'is_binutils': False,
34        'test_option': 'version',
35        'path_in_tool': '../platform-tools/hdc',
36    }
37}
38
39
40def get_lib():
41    script_dir = os.path.dirname(os.path.realpath(__file__))
42    system_type = platform.system().lower()
43    if system_type == "windows":
44        lib_path = os.path.join(script_dir, "bin", system_type,
45                                "x86_64", "libhiperf_report.dll")
46    elif system_type == "linux":
47        lib_path = os.path.join(script_dir, "bin", system_type,
48                                "x86_64", "libhiperf_report.so")
49    elif system_type == "darwin":
50        lib_path = os.path.join(script_dir, "bin", system_type,
51                                "x86_64", "libhiperf_report.dylib")
52    else:
53        print("Un Support System Platform")
54        raise RuntimeError
55    if not os.path.exists(lib_path):
56        print("{} does not exist!".format(lib_path))
57        raise RuntimeError
58    return cdll.LoadLibrary(lib_path)
59
60
61def remove(files):
62    if os.path.isfile(files):
63        os.remove(files)
64    elif os.path.isdir(files):
65        shutil.rmtree(files, ignore_errors=True)
66
67
68def is_elf_file(path):
69    if os.path.isfile(path):
70        with open(path, 'rb') as file_bin:
71            data = file_bin.read(4)
72            if len(data) == 4 and bytes_to_str(data) == '\x7fELF':
73                return True
74    return False
75
76
77def get_architecture(elf_path):
78    if is_elf_file(elf_path):
79        my_lib = get_lib()
80        my_lib.ReportGetElfArch.restype = c_char_p
81        ret = my_lib.ReportGetElfArch(elf_path.encode())
82        return ret.decode()
83    return 'unknown'
84
85
86def get_build_id(elf_path):
87    if is_elf_file(elf_path):
88        my_lib = get_lib()
89        my_lib.ReportGetBuildId.restype = c_char_p
90        ret = my_lib.ReportGetBuildId(elf_path.encode())
91        return ret.decode()
92    return ''
93
94
95def get_hiperf_binary_path(arch, binary_name):
96    if arch == 'aarch64':
97        arch = 'arm64'
98    script_dir = os.path.dirname(os.path.realpath(__file__))
99    arch_dir = os.path.join(script_dir, "bin", "ohos", arch)
100    if not os.path.isdir(arch_dir):
101        raise Exception("can't find arch directory: %s" % arch_dir)
102    binary_path = os.path.join(arch_dir, binary_name)
103    if not os.path.isfile(binary_path):
104        raise Exception("can't find binary: %s" % binary_path)
105    return binary_path
106
107
108def str_to_bytes(str_value):
109    if not (sys.version_info >= (3, 0)):
110        return str_value
111    return str_value.encode('utf-8')
112
113
114def bytes_to_str(bytes_value):
115    if not bytes_value:
116        return ''
117    if not (sys.version_info >= (3, 0)):
118        return bytes_value
119    return bytes_value.decode('utf-8')
120
121
122def executable_file_available(executable, option='--help'):
123    try:
124        print([executable, option])
125        subproc = subprocess.Popen([executable, option],
126                                   stdout=subprocess.PIPE,
127                                   stderr=subprocess.PIPE)
128        subproc.communicate(timeout=5)
129        return subproc.returncode == 0
130    except OSError:
131        return False
132
133
134def dir_check(arg):
135    path = os.path.realpath(arg)
136    if not os.path.isdir(path):
137        raise argparse.ArgumentTypeError('{} is not a directory.'.format(path))
138    return path
139
140
141def file_check(arg):
142    path = os.path.realpath(arg)
143    if not os.path.isfile(path):
144        raise argparse.ArgumentTypeError('{} is not a file.'.format(path))
145    return path
146
147
148def get_arg_list(arg_list):
149    res = []
150    if arg_list:
151        for arg in arg_list:
152            res += arg
153    return res
154
155
156def get_arch(arch):
157    if 'aarch64' in arch:
158        return 'arm64'
159    if 'arm' in arch:
160        return 'arm'
161    if 'x86_64' in arch or "amd64" in arch:
162        return 'x86_64'
163    if '86' in arch:
164        return 'x86'
165    raise Exception('unsupported architecture: %s' % arch.strip())
166
167
168def find_tool_path(tool, tool_path=None):
169    if tool not in EXPECTED_TOOLS:
170        return None
171    tool_info = EXPECTED_TOOLS[tool]
172    test_option = tool_info.get('test_option', '--help')
173    path_in_tool = tool_info['path_in_tool']
174    path_in_tool = path_in_tool.replace('/', os.sep)
175
176    # 1. Find tool in the given tool path.
177    if tool_path:
178        path = os.path.join(tool_path, path_in_tool)
179        if executable_file_available(path, test_option):
180            return path
181
182    # 2. Find tool in the tool directory containing hiperf scripts.
183    path = os.path.join('../..', path_in_tool)
184    if executable_file_available(path, test_option):
185        return path
186
187    # 3. Find tool in $PATH.
188    if executable_file_available(tool, test_option):
189        return tool
190
191    return None
192
193
194def get_used_binaries(perf_data, report_file, local_lib_dir):
195    if local_lib_dir:
196        get_lib().ReportUnwindJson(perf_data.encode("utf-8"),
197                                   'json.txt'.encode("utf-8"),
198                                   local_lib_dir.encode("utf-8"))
199    else:
200        get_lib().ReportJson(perf_data.encode("utf-8"),
201                             'json.txt'.encode("utf-8"))
202    time.sleep(2)
203    with open('json.txt', 'r') as json_file:
204        all_json = json_file.read()
205    with open('./testModule/report.html', 'r', encoding='utf-8') as html_file:
206        html_str = html_file.read()
207    with open(report_file, 'w', encoding='utf-8', mode='0o666') as report_html_file:
208        report_html_file.write(html_str + all_json + '</script>'
209                                      ' </body>'
210                                      ' </html>')
211    dirname, _ = os.path.split(os.path.abspath(sys.argv[0]))
212    abs_path = os.path.join(dirname, report_file)
213    print("save to %s success" % abs_path)
214
215
216def validate_json(json_data):
217    try:
218        json.loads(json_data)
219        return True
220    except ValueError:
221        return False
222
223
224def get_shell_result(cmd, words, ret):
225    print(f"\nexecuting command: {cmd}")
226    output = subprocess.check_output(cmd.split()).decode()
227    print(f"\noutput: {output}")
228    if ret:
229        assert words in output
230    else:
231        assert words not in output
232
233
234def get_path_by_attribute(tree, key, value):
235    attributes = tree['attributes']
236    if attributes is None:
237        print('tree contains no attributes')
238        return None
239    path = []
240    if attributes.get(key) == value:
241        return path
242    for index, child in enumerate(tree['children']):
243        child_path = path + [index]
244        result = get_path_by_attribute(child, key, value)
245        if result is not None:
246            return child_path + result
247    return None
248
249
250def get_element_by_path(tree, path):
251    if len(path) == 1:
252        return tree['children'][path[0]]
253    return get_element_by_path(tree['children'][path[0]], path[1:])
254
255
256def get_location_by_text(tree, text):
257    path = get_path_by_attribute(tree, 'text', text)
258    if path is None or len(path) == 0:
259        print('text not found in layout file')
260    element = get_element_by_path(tree, path)
261    locations = element['attributes']['bounds'].replace('[', ' ').replace(']', ' ').replace(',', ' ').strip().split()
262    return (int(locations[0] + locations[2]) / 1000) + 20, (int(locations[1] + locations[3]) / 10000) + 10
263
264
265def touch(dx, dy):
266    print('---------------------------')
267    print(dx)
268    print(dy)
269    print('---------------------------')
270    output = subprocess.check_output(f"hdc shell uitest uiInput click {dx} {dy}")
271
272
273def get_layout_tree():
274    output = subprocess.check_output("hdc shell uitest dumpLayout", text=True)
275    path = output.split(':')[-1]
276    print(path)
277    cmd = 'hdc file recv ' + path
278    print(cmd)
279    subprocess.call('hdc file recv ' + path)
280    subprocess.call('hdc shell rm ' + path)
281    path = path[path.find('layout'):]
282    print(path[:len(path) - 1])
283    with open(path[:len(path) - 1], encoding='utf-8') as f:
284        tree = json.load(f)
285    return tree
286
287
288def touch_button_by_text(text):
289    layout_tree = get_layout_tree()
290    location = get_location_by_text(layout_tree, text)
291    touch(location[0], location[1])
292
293
294class HdcInterface:
295    def __init__(self, root_authority=True):
296        hdc_path = find_tool_path(HDC_NAME)
297        if not hdc_path:
298            raise Exception("Can't find hdc in PATH environment.")
299        self.hdc_path = hdc_path
300        self.root_authority = root_authority
301
302    def run_hdc_cmd(self, hdc_args, log_output=True):
303        hdc_args = [self.hdc_path] + hdc_args
304        if IS_DEBUG:
305            print('run hdc cmd: %s' % hdc_args)
306        subproc = subprocess.Popen(hdc_args, stdout=subprocess.PIPE)
307        (out, _) = subproc.communicate()
308        out = bytes_to_str(out)
309        return_code = subproc.returncode
310        result = (return_code == 0)
311        if out and hdc_args[1] != 'file send' and \
312                hdc_args[1] != 'file recv':
313            if log_output:
314                print(out)
315        if IS_DEBUG:
316            print('run hdc cmd: %s  [result %s]' % (hdc_args, result))
317        return result, out
318
319    def check_run(self, hdc_args):
320        result, out = self.run_hdc_cmd(hdc_args)
321        if not result:
322            raise Exception('run "hdc %s" failed' % hdc_args)
323        return out
324
325    def switch_root(self):
326        if not self.root_authority:
327            self._not_use_root()
328            return False
329        result, out = self.run_hdc_cmd(['shell', 'whoami'])
330        if not result:
331            return False
332        if 'root' in out:
333            return True
334        build_type = self.get_attribute('ro.build.type')
335        if build_type == 'user':
336            return False
337        self.run_hdc_cmd(['root'])
338        time.sleep(1)
339        self.run_hdc_cmd(['wait-for-device'])
340        result, out = self.run_hdc_cmd(['shell', 'whoami'])
341        return result and 'root' in out
342
343    def get_attribute(self, name):
344        result, out = self.run_hdc_cmd(['shell', 'getprop',
345                                        name])
346        if result:
347            return out
348
349    def get_device_architecture(self):
350        output = self.check_run(['shell', 'uname', '-m'])
351        return get_arch(output)
352
353    def _not_use_root(self):
354        result, out = self.run_hdc_cmd(['shell', 'whoami'])
355        if not result or 'root' not in out:
356            return
357        print('unroot hdc')
358        self.run_hdc_cmd(['unroot'])
359        time.sleep(1)
360        self.run_hdc_cmd(['wait-for-device'])
361
362
363class ElfStruct:
364    def __init__(self, path, lib_name):
365        self.path = path
366        self.name = lib_name
367
368
369class LocalLibDownload:
370
371    def __init__(self, device_arch, hdc):
372        self.hdc = hdc
373        self.device_arch = device_arch
374
375        self.build_id_map_of_host = {}
376        self.build_id_map_of_device = {}
377        self.host_lib_count_map = {}
378        self.request_architectures = self.__get_need_architectures(device_arch)
379
380    @classmethod
381    def __get_need_architectures(self, device_arch):
382        if device_arch == 'x86_64':
383            return ['x86', 'x86_64']
384        if device_arch == 'x86':
385            return ['x86']
386        if device_arch == 'arm64':
387            return ['arm', 'arm64']
388        if device_arch == 'arm':
389            return ['arm']
390        return []
391
392    def get_host_local_libs(self, local_lib_dir):
393        self.build_id_map_of_host.clear()
394        for root, dirs, files in os.walk(local_lib_dir):
395            for name in files:
396                if name.endswith('.so'):
397                    self.__append_host_local_lib(os.path.join(root, name), name)
398
399    def get_device_local_libs(self):
400        self.build_id_map_of_device.clear()
401        if os.path.exists(BUILD_ID_FILE):
402            remove(BUILD_ID_FILE)
403        self.hdc.check_run(['shell', 'mkdir', '-p', SYMBOL_FILES_DIR])
404        self.hdc.run_hdc_cmd(['file recv', SYMBOL_FILES_DIR + BUILD_ID_FILE])
405        if os.path.isfile(BUILD_ID_FILE):
406            with open(BUILD_ID_FILE, 'rb') as file_bin:
407                for line in file_bin.readlines():
408                    line = bytes_to_str(line).strip()
409                    items = line.split('=')
410                    if len(items) == 2:
411                        self.build_id_map_of_device[items[0]] = items[1]
412            remove(BUILD_ID_FILE)
413
414    def update_device_local_libs(self):
415        # Send local libs to device.
416        for build_id in self.build_id_map_of_host:
417            if build_id not in self.build_id_map_of_device:
418                elf_struct = self.build_id_map_of_host[build_id]
419                self.hdc.check_run(['file send', elf_struct.path,
420                                    SYMBOL_FILES_DIR + elf_struct.name])
421
422        # Remove Device lib while local libs not exist on host.
423        for build_id in self.build_id_map_of_device:
424            if build_id not in self.build_id_map_of_host:
425                name = self.build_id_map_of_device[build_id]
426                self.hdc.run_hdc_cmd(['shell', 'rm', SYMBOL_FILES_DIR + name])
427
428        # Send new build_id_list to device.
429        with open(BUILD_ID_FILE, 'wb') as file_bin:
430            for build_id in self.build_id_map_of_host:
431                str_bytes = str_to_bytes('%s=%s\n' % (build_id,
432                                          self.build_id_map_of_host[
433                                              build_id].name))
434                file_bin.write(str_bytes)
435
436        self.hdc.check_run(['file send', BUILD_ID_FILE,
437                            SYMBOL_FILES_DIR + BUILD_ID_FILE])
438        remove(BUILD_ID_FILE)
439
440    def __append_host_local_lib(self, path, name):
441        build_id = get_build_id(path)
442        if not build_id:
443            return
444        arch = get_architecture(path)
445        if arch not in self.request_architectures:
446            return
447
448        elf_struct = self.build_id_map_of_host.get(build_id)
449        if not elf_struct:
450            count = self.host_lib_count_map.get(name, 0)
451            self.host_lib_count_map[name] = count + 1
452            if count == 0:
453                unique_name = name
454            else:
455                unique_name = name + '_' + count
456            self.build_id_map_of_host[build_id] = ElfStruct(path,
457                                                            unique_name)
458        else:
459            elf_struct.path = path
460
461
462class PerformanceProfile:
463    """Class of all Profilers."""
464
465    def __init__(self, args, control_module=""):
466        self.args = args
467        self.hdc = HdcInterface(root_authority=not args.not_hdc_root)
468        self.device_root = self.hdc.switch_root()
469        self.device_arch = self.hdc.get_device_architecture()
470        self.record_subproc = None
471        self.is_control = bool(control_module)
472        self.control_mode = control_module
473
474    def profile(self):
475        if not self.is_control or self.control_mode == "prepare":
476            print('prepare profiling')
477            self.download()
478            print('start profiling')
479            if not self.combine_args():
480                return
481        else:
482            self.exec_control()
483        self.profiling()
484
485        if not self.is_control:
486            print('pull profiling data')
487            self.get_profiling_data()
488        if self.control_mode == "stop":
489            self.wait_data_generate_done()
490        print('profiling is finished.')
491
492    def download(self):
493        """Prepare recording. """
494        if self.args.local_lib_dir:
495            self.download_libs()
496
497    def download_libs(self):
498        executor = LocalLibDownload(self.device_arch, self.hdc)
499        executor.get_host_local_libs(self.args.local_lib_dir)
500        executor.get_device_local_libs()
501        executor.update_device_local_libs()
502
503    def combine_args(self):
504        if self.args.package_name:
505            if self.args.ability:
506                self.kill_process()
507            self.start_profiling(['--app', self.args.package_name])
508            if self.args.ability:
509                ability = self.args.package_name + '/' + self.args.ability
510                start_cmd = ['shell', 'aa', 'start', '-a', ability]
511                result = self.hdc.run_hdc_cmd(start_cmd)
512                if not result:
513                    self.record_subproc.terminate()
514                    print("Can't start ability %s" % ability)
515                    return False
516            # else: no need to start an ability.
517        elif self.args.local_program:
518            pid = self.hdc.check_run(['shell', 'pidof',
519                                      self.args.local_program])
520            if not pid:
521                print("Can't find pid of %s" % self.args.local_program)
522                return False
523            pid = int(pid)
524            self.start_profiling(['-p', str(pid)])
525        elif self.args.cmd:
526            cmds = self.args.cmd.split(' ')
527            cmd = [cmd.replace("'", "") for cmd in cmds]
528            self.start_profiling(cmd)
529        elif self.args.pid:
530            self.start_profiling(['-p', ','.join(self.args.pid)])
531        elif self.args.tid:
532            self.start_profiling(['-t', ','.join(self.args.tid)])
533        elif self.args.system_wide:
534            self.start_profiling(['-a'])
535        return True
536
537    def kill_process(self):
538        if self.get_app_process():
539            self.hdc.check_run(['shell', 'aa', 'force-stop', self.args.app])
540            count = 0
541            while True:
542                time.sleep(1)
543                pid = self.get_app_process()
544                if not pid:
545                    break
546                count += 1
547                # 3 seconds exec kill
548                if count >= 3:
549                    self.run_in_app_dir(['kill', '-9', str(pid)])
550
551    def get_app_process(self):
552        result, output = self.hdc.run_hdc_cmd(
553            ['shell', 'pidof', self.args.package_name])
554        print(output)
555        print(result)
556        return int(output) if result else None
557
558    def run_in_app_dir(self, args):
559        if self.device_root:
560            hdc_args = ['shell', 'cd /data/data/' + self.args.package_name +
561                        ' && ' + (' '.join(args))]
562        else:
563            hdc_args = ['shell', 'run-as', self.args.package_name] + args
564        return self.hdc.run_hdc_cmd(hdc_args, log_output=False)
565
566    def start_profiling(self, selected_args):
567        """Start hiperf reocrd process on device."""
568        self.hdc.run_hdc_cmd(['shell', 'rm',
569                              '/data/local/tmp/perf.data'])
570        record_options = self.args.record_options.split(' ')
571        record_options = [cmd.replace("'", "") for cmd in record_options]
572        if self.is_control:
573            args = ['hiperf', 'record',
574                    '--control', self.control_mode, '-o',
575                    '/data/local/tmp/perf.data'] + record_options
576        else:
577            args = ['hiperf', 'record', '-o',
578                    '/data/local/tmp/perf.data'] + record_options
579        if self.args.local_lib_dir and self.hdc.run_hdc_cmd(
580                ['shell', 'ls', SYMBOL_FILES_DIR]):
581            args += ['--symbol-dir', SYMBOL_FILES_DIR]
582        args += selected_args
583        hdc_args = [self.hdc.hdc_path, 'shell'] + args
584        print('run hdc cmd: %s' % hdc_args)
585        self.record_subproc = subprocess.Popen(hdc_args)
586
587    def profiling(self):
588        """
589        Wait until profiling finishes, or stop profiling when user
590        presses Ctrl-C.
591        """
592
593        try:
594            return_code = self.record_subproc.wait()
595        except KeyboardInterrupt:
596            self.end_profiling()
597            self.record_subproc = None
598            return_code = 0
599        print('profiling result [%s]' % (return_code == 0))
600        if return_code != 0:
601            raise Exception('Failed to record profiling data.')
602
603    def end_profiling(self):
604        """
605        Stop profiling by sending SIGINT to hiperf, and wait until it exits
606        to make sure perf.data is completely generated.
607        """
608        has_killed = False
609        while True:
610            (result, out) = self.hdc.run_hdc_cmd(
611                ['shell', 'pidof', 'hiperf'])
612            if not out:
613                break
614            if not has_killed:
615                has_killed = True
616                self.hdc.run_hdc_cmd(['shell', 'pkill',
617                                      '-l', '2', 'hiperf'])
618            time.sleep(1)
619
620    def get_profiling_data(self):
621        current_path = os.getcwd()
622        full_path = os.path.join(current_path, self.args.output_perf_data)
623        self.hdc.check_run(['file recv', '/data/local/tmp/perf.data',
624                            full_path])
625        self.hdc.run_hdc_cmd(['shell', 'rm',
626                              '/data/local/tmp/perf.data'])
627
628    def exec_control(self):
629        hdc_args = [self.hdc.hdc_path, 'shell',
630                    'hiperf', 'record',
631                    '--control', self.control_mode]
632        print('run hdc cmd: %s' % hdc_args)
633        self.record_subproc = subprocess.Popen(hdc_args)
634
635    def wait_data_generate_done(self):
636        last_size = 0
637        while True:
638            (result, out) = self.hdc.run_hdc_cmd(
639                ['shell', 'du', 'data/local/tmp/perf.data'])
640            if "du" not in out:
641                current_size = out.split(" ")[0]
642                if current_size == last_size:
643                    self.get_profiling_data()
644                    break
645                else:
646                    last_size = current_size
647            else:
648                print("not generate perf.data")
649                break
650
651            time.sleep(1)
652