• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""utils.py: export utility functions.
19"""
20
21from __future__ import annotations
22import argparse
23from concurrent.futures import Future, ThreadPoolExecutor
24from dataclasses import dataclass
25import etm_types as etm
26import logging
27import os
28import os.path
29from pathlib import Path
30import re
31import shutil
32import subprocess
33import sys
34import time
35from typing import Any, Dict, Iterator, List, Optional, Set, Tuple, Union, TextIO
36
37
38NDK_ERROR_MESSAGE = "Please install the Android NDK (https://developer.android.com/studio/projects/install-ndk), then set NDK path with --ndk_path option."
39
40
41def get_script_dir() -> str:
42    return os.path.dirname(os.path.realpath(__file__))
43
44
45def is_windows() -> bool:
46    return sys.platform == 'win32' or sys.platform == 'cygwin'
47
48
49def is_darwin() -> bool:
50    return sys.platform == 'darwin'
51
52
53def get_platform() -> str:
54    if is_windows():
55        return 'windows'
56    if is_darwin():
57        return 'darwin'
58    return 'linux'
59
60
61def str_to_bytes(str_value: str) -> bytes:
62    # In python 3, str are wide strings whereas the C api expects 8 bit strings,
63    # hence we have to convert. For now using utf-8 as the encoding.
64    return str_value.encode('utf-8')
65
66
67def bytes_to_str(bytes_value: Optional[bytes]) -> str:
68    if not bytes_value:
69        return ''
70    return bytes_value.decode('utf-8')
71
72
73def get_target_binary_path(arch: str, binary_name: str) -> str:
74    if arch == 'aarch64':
75        arch = 'arm64'
76    arch_dir = os.path.join(get_script_dir(), "bin", "android", arch)
77    if not os.path.isdir(arch_dir):
78        log_fatal("can't find arch directory: %s" % arch_dir)
79    binary_path = os.path.join(arch_dir, binary_name)
80    if not os.path.isfile(binary_path):
81        log_fatal("can't find binary: %s" % binary_path)
82    return binary_path
83
84
85def get_host_binary_path(binary_name: str) -> str:
86    dirname = os.path.join(get_script_dir(), 'bin')
87    if is_windows():
88        if binary_name.endswith('.so'):
89            binary_name = binary_name[0:-3] + '.dll'
90        elif '.' not in binary_name:
91            binary_name += '.exe'
92        dirname = os.path.join(dirname, 'windows')
93    elif sys.platform == 'darwin':  # OSX
94        if binary_name.endswith('.so'):
95            binary_name = binary_name[0:-3] + '.dylib'
96        dirname = os.path.join(dirname, 'darwin')
97    else:
98        dirname = os.path.join(dirname, 'linux')
99    dirname = os.path.join(dirname, 'x86_64' if sys.maxsize > 2 ** 32 else 'x86')
100    binary_path = os.path.join(dirname, binary_name)
101    if not os.path.isfile(binary_path):
102        log_fatal("can't find binary: %s" % binary_path)
103    return binary_path
104
105
106def is_executable_available(executable: str, option='--help') -> bool:
107    """ Run an executable to see if it exists. """
108    try:
109        subproc = subprocess.Popen([executable, option], stdout=subprocess.PIPE,
110                                   stderr=subprocess.PIPE)
111        subproc.communicate()
112        return subproc.returncode == 0
113    except OSError:
114        return False
115
116
117class ToolFinder:
118    """ Find tools in ndk or sdk. """
119    DEFAULT_SDK_PATH = {
120        'darwin': 'Library/Android/sdk',
121        'linux': 'Android/Sdk',
122        'windows': 'AppData/Local/Android/sdk',
123    }
124
125    EXPECTED_TOOLS = {
126        'adb': {
127            'is_binutils': False,
128            'test_option': 'version',
129            'path_in_sdk': 'platform-tools/adb',
130        },
131        'llvm-objdump': {
132            'is_binutils': False,
133            'path_in_ndk':
134                lambda platform: 'toolchains/llvm/prebuilt/%s-x86_64/bin/llvm-objdump' % platform,
135        },
136        'llvm-readelf': {
137            'is_binutils': False,
138            'path_in_ndk':
139                lambda platform: 'toolchains/llvm/prebuilt/%s-x86_64/bin/llvm-readelf' % platform,
140        },
141        'llvm-symbolizer': {
142            'is_binutils': False,
143            'path_in_ndk':
144                lambda platform: 'toolchains/llvm/prebuilt/%s-x86_64/bin/llvm-symbolizer' % platform,
145        },
146        'llvm-strip': {
147            'is_binutils': False,
148            'path_in_ndk':
149                lambda platform: 'toolchains/llvm/prebuilt/%s-x86_64/bin/llvm-strip' % platform,
150        },
151    }
152
153    @classmethod
154    def find_ndk_and_sdk_paths(cls, ndk_path: Optional[str] = None
155                               ) -> Iterator[Tuple[Optional[str], Optional[str]]]:
156        # Use the given ndk path.
157        if ndk_path and os.path.isdir(ndk_path):
158            ndk_path = os.path.abspath(ndk_path)
159            yield ndk_path, cls.find_sdk_path(ndk_path)
160        # Find ndk in the parent directory containing simpleperf scripts.
161        ndk_path = os.path.dirname(os.path.abspath(get_script_dir()))
162        yield ndk_path, cls.find_sdk_path(ndk_path)
163        # Find ndk in the default sdk installation path.
164        if is_windows():
165            home = os.environ.get('HOMEDRIVE') + os.environ.get('HOMEPATH')
166        else:
167            home = os.environ.get('HOME')
168        if home:
169            platform = get_platform()
170            sdk_path = os.path.join(home, cls.DEFAULT_SDK_PATH[platform].replace('/', os.sep))
171            if os.path.isdir(sdk_path):
172                path = os.path.join(sdk_path, 'ndk')
173                if os.path.isdir(path):
174                    # Android Studio can install multiple ndk versions in 'ndk'.
175                    # Find the newest one.
176                    ndk_version = None
177                    for name in os.listdir(path):
178                        if not ndk_version or ndk_version < name:
179                            ndk_version = name
180                    if ndk_version:
181                        yield os.path.join(path, ndk_version), sdk_path
182            ndk_path = os.path.join(sdk_path, 'ndk-bundle')
183            if os.path.isdir(ndk_path):
184                yield ndk_path, sdk_path
185
186    @classmethod
187    def find_sdk_path(cls, ndk_path: str) -> Optional[str]:
188        path = ndk_path
189        for _ in range(2):
190            path = os.path.dirname(path)
191            if os.path.isdir(os.path.join(path, 'platform-tools')):
192                return path
193        return None
194
195    @classmethod
196    def _get_binutils_path_in_ndk(cls, toolname: str, arch: Optional[str], platform: str
197                                  ) -> Tuple[str, str]:
198        if not arch:
199            arch = 'arm64'
200        if arch == 'arm64':
201            name = 'aarch64-linux-android-' + toolname
202        elif arch == 'arm':
203            name = 'arm-linux-androideabi-' + toolname
204        elif arch == 'x86_64':
205            name = 'x86_64-linux-android-' + toolname
206        elif arch == 'x86':
207            name = 'i686-linux-android-' + toolname
208        else:
209            log_fatal('unexpected arch %s' % arch)
210        path = 'toolchains/llvm/prebuilt/%s-x86_64/bin/%s' % (platform, name)
211        return (name, path)
212
213    @classmethod
214    def find_tool_path(cls, toolname: str, ndk_path: Optional[str] = None,
215                       arch: Optional[str] = None) -> Optional[str]:
216        tool_info = cls.EXPECTED_TOOLS.get(toolname)
217        if not tool_info:
218            return None
219
220        is_binutils = tool_info['is_binutils']
221        test_option = tool_info.get('test_option', '--help')
222        platform = get_platform()
223
224        # Find tool in clang prebuilts in Android platform.
225        if toolname.startswith('llvm-') and platform == 'linux' and get_script_dir().endswith(
226                'system/extras/simpleperf/scripts'):
227            path = str(
228                Path(get_script_dir()).parents[3] / 'prebuilts' / 'clang' / 'host' / 'linux-x86' /
229                'llvm-binutils-stable' / toolname)
230            if is_executable_available(path, test_option):
231                return path
232
233        # Find tool in NDK or SDK.
234        path_in_ndk = None
235        path_in_sdk = None
236        if is_binutils:
237            toolname_with_arch, path_in_ndk = cls._get_binutils_path_in_ndk(
238                toolname, arch, platform)
239        else:
240            toolname_with_arch = toolname
241            if 'path_in_ndk' in tool_info:
242                path_in_ndk = tool_info['path_in_ndk'](platform)
243            elif 'path_in_sdk' in tool_info:
244                path_in_sdk = tool_info['path_in_sdk']
245        if path_in_ndk:
246            path_in_ndk = path_in_ndk.replace('/', os.sep)
247        elif path_in_sdk:
248            path_in_sdk = path_in_sdk.replace('/', os.sep)
249
250        for ndk_dir, sdk_dir in cls.find_ndk_and_sdk_paths(ndk_path):
251            if path_in_ndk and ndk_dir:
252                path = os.path.join(ndk_dir, path_in_ndk)
253                if is_executable_available(path, test_option):
254                    return path
255            elif path_in_sdk and sdk_dir:
256                path = os.path.join(sdk_dir, path_in_sdk)
257                if is_executable_available(path, test_option):
258                    return path
259
260        # Find tool in $PATH.
261        if is_executable_available(toolname_with_arch, test_option):
262            return toolname_with_arch
263
264        # Find tool without arch in $PATH.
265        if is_binutils and tool_info.get('accept_tool_without_arch'):
266            if is_executable_available(toolname, test_option):
267                return toolname
268        return None
269
270
271class AdbHelper(object):
272    def __init__(self, enable_switch_to_root: bool = True):
273        adb_path = ToolFinder.find_tool_path('adb')
274        if not adb_path:
275            log_exit("Can't find adb in PATH environment.")
276        self.adb_path: str = adb_path
277        self.enable_switch_to_root = enable_switch_to_root
278        self.serial_number: Optional[str] = None
279
280    def is_device_available(self) -> bool:
281        return self.run_and_return_output(['shell', 'whoami'])[0]
282
283    def run(self, adb_args: List[str], log_output: bool = False, log_stderr: bool = False) -> bool:
284        return self.run_and_return_output(adb_args, log_output, log_stderr)[0]
285
286    def run_and_return_output(self, adb_args: List[str], log_output: bool = False,
287                              log_stderr: bool = False) -> Tuple[bool, str]:
288        adb_args = [self.adb_path] + adb_args
289        logging.debug('run adb cmd: %s' % adb_args)
290        env = None
291        if self.serial_number:
292            env = os.environ.copy()
293            env['ANDROID_SERIAL'] = self.serial_number
294        subproc = subprocess.Popen(
295            adb_args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
296        stdout_data, stderr_data = subproc.communicate()
297        stdout_data = bytes_to_str(stdout_data)
298        stderr_data = bytes_to_str(stderr_data)
299        returncode = subproc.returncode
300        result = (returncode == 0)
301        if log_output and stdout_data:
302            logging.debug(stdout_data)
303        if log_stderr and stderr_data:
304            logging.warning(stderr_data)
305        logging.debug('run adb cmd: %s  [result %s]' % (adb_args, result))
306        return (result, stdout_data)
307
308    def check_run(self, adb_args: List[str], log_output: bool = False):
309        self.check_run_and_return_output(adb_args, log_output)
310
311    def check_run_and_return_output(self, adb_args: List[str], log_output: bool = False,
312                                    log_stderr: bool = False) -> str:
313        result, stdoutdata = self.run_and_return_output(adb_args, log_output, True)
314        if not result:
315            log_exit('run "adb %s" failed: %s' % (adb_args, stdoutdata))
316        return stdoutdata
317
318    def _unroot(self):
319        result, stdoutdata = self.run_and_return_output(['shell', 'whoami'])
320        if not result:
321            return
322        if 'root' not in stdoutdata:
323            return
324        logging.info('unroot adb')
325        self.run(['unroot'])
326        time.sleep(1)
327        self.run(['wait-for-device'])
328
329    def switch_to_root(self) -> bool:
330        if not self.enable_switch_to_root:
331            self._unroot()
332            return False
333        result, stdoutdata = self.run_and_return_output(['shell', 'whoami'])
334        if not result:
335            return False
336        if 'root' in stdoutdata:
337            return True
338        build_type = self.get_property('ro.build.type')
339        if build_type == 'user':
340            return False
341        self.run(['root'])
342        time.sleep(1)
343        self.run(['wait-for-device'])
344        result, stdoutdata = self.run_and_return_output(['shell', 'whoami'])
345        return result and 'root' in stdoutdata
346
347    def get_property(self, name: str) -> Optional[str]:
348        result, stdoutdata = self.run_and_return_output(['shell', 'getprop', name])
349        return stdoutdata.strip() if result else None
350
351    def set_property(self, name: str, value: str) -> bool:
352        return self.run(['shell', 'setprop', name, value])
353
354    def get_device_arch(self) -> str:
355        output = self.check_run_and_return_output(['shell', 'uname', '-m'])
356        if 'aarch64' in output:
357            return 'arm64'
358        if 'arm' in output:
359            return 'arm'
360        if 'x86_64' in output:
361            return 'x86_64'
362        if '86' in output:
363            return 'x86'
364        if 'riscv64' in output:
365            return 'riscv64'
366        log_fatal('unsupported architecture: %s' % output.strip())
367        return ''
368
369    def get_android_version(self) -> int:
370        """ Get Android version on device, like 7 is for Android N, 8 is for Android O."""
371        def parse_version(s: str) -> int:
372            if not s:
373                return 0
374            if s[0].isdigit():
375                i = 1
376                while i < len(s) and s[i].isdigit():
377                    i += 1
378                return int(s[:i])
379            else:
380                c = s[0].upper()
381                if c.isupper() and 'L' <= c <= 'V':
382                    return ord(c) - ord('L') + 5
383            return 0
384
385        android_version = 0
386        s = self.get_property('ro.build.version.codename')
387        if s != 'REL':
388            android_version = parse_version(s)
389        if android_version == 0:
390            s = self.get_property('ro.build.version.release')
391            android_version = parse_version(s)
392        if android_version == 0:
393            s = self.get_property('ro.build.version.sdk')
394            if int(s) >= 35:
395                android_version = 15
396        return android_version
397
398
399def flatten_arg_list(arg_list: List[List[str]]) -> List[str]:
400    res = []
401    if arg_list:
402        for items in arg_list:
403            res += items
404    return res
405
406
407def remove(dir_or_file: Union[Path, str]):
408    if os.path.isfile(dir_or_file):
409        os.remove(dir_or_file)
410    elif os.path.isdir(dir_or_file):
411        shutil.rmtree(dir_or_file, ignore_errors=True)
412
413
414def open_report_in_browser(report_path: str):
415    if is_darwin():
416        # On darwin 10.12.6, webbrowser can't open browser, so try `open` cmd first.
417        try:
418            subprocess.check_call(['open', report_path])
419            return
420        except subprocess.CalledProcessError:
421            pass
422    import webbrowser
423    try:
424        # Try to open the report with Chrome
425        browser = webbrowser.get('google-chrome')
426        browser.open(report_path, new=0, autoraise=True)
427    except webbrowser.Error:
428        # webbrowser.get() doesn't work well on darwin/windows.
429        webbrowser.open_new_tab(report_path)
430
431
432class BinaryFinder:
433    def __init__(self, binary_cache_dir: Optional[Union[Path, str]], readelf: ReadElf):
434        if isinstance(binary_cache_dir, str):
435            binary_cache_dir = Path(binary_cache_dir)
436        self.binary_cache_dir = binary_cache_dir
437        self.readelf = readelf
438        self.build_id_map = self._load_build_id_map()
439
440    def _load_build_id_map(self) -> Dict[str, Path]:
441        build_id_map: Dict[str, Path] = {}
442        if self.binary_cache_dir:
443            build_id_list_file = self.binary_cache_dir / 'build_id_list'
444            if build_id_list_file.is_file():
445                with open(self.binary_cache_dir / 'build_id_list', 'rb') as fh:
446                    for line in fh.readlines():
447                        # lines are in format "<build_id>=<path_in_binary_cache>".
448                        items = bytes_to_str(line).strip().split('=')
449                        if len(items) == 2:
450                            build_id_map[items[0]] = self.binary_cache_dir / items[1]
451        return build_id_map
452
453    def find_binary(self, dso_path_in_record_file: str,
454                    expected_build_id: Optional[str]) -> Optional[Path]:
455        """ If expected_build_id is None, don't check build id.
456            Otherwise, the build id of the found binary should match the expected one."""
457        # Find binary from build id map.
458        if expected_build_id:
459            path = self.build_id_map.get(expected_build_id)
460            if path and self._check_path(path, expected_build_id):
461                return path
462        # Find binary by path in binary cache.
463        if self.binary_cache_dir:
464            path = self.binary_cache_dir / dso_path_in_record_file[1:].replace('/', os.sep)
465            if self._check_path(path, expected_build_id):
466                return path
467        # Find binary by its absolute path.
468        path = Path(dso_path_in_record_file)
469        if self._check_path(path, expected_build_id):
470            return path
471        return None
472
473    def _check_path(self, path: Path, expected_build_id: Optional[str]) -> bool:
474        if not self.readelf.is_elf_file(path):
475            return False
476        if expected_build_id is not None:
477            return self.readelf.get_build_id(path) == expected_build_id
478        return True
479
480
481class Addr2Nearestline(object):
482    """ Use llvm-symbolizer to convert (dso_path, func_addr, addr) to (source_file, line).
483        For instructions generated by C++ compilers without a matching statement in source code
484        (like stack corruption check, switch optimization, etc.), addr2line can't generate
485        line information. However, we want to assign the instruction to the nearest line before
486        the instruction (just like objdump -dl). So we use below strategy:
487        Instead of finding the exact line of the instruction in an address, we find the nearest
488        line to the instruction in an address. If an address doesn't have a line info, we find
489        the line info of address - 1. If still no line info, then use address - 2, address - 3,
490        etc.
491
492        The implementation steps are as below:
493        1. Collect all (dso_path, func_addr, addr) requests before converting. This saves the
494        times to call addr2line.
495        2. Convert addrs to (source_file, line) pairs for each dso_path as below:
496          2.1 Check if the dso_path has .debug_line. If not, omit its conversion.
497          2.2 Get arch of the dso_path, and decide the addr_step for it. addr_step is the step we
498          change addr each time. For example, since instructions of arm64 are all 4 bytes long,
499          addr_step for arm64 can be 4.
500          2.3 Use addr2line to find line info for each addr in the dso_path.
501          2.4 For each addr without line info, use addr2line to find line info for
502              range(addr - addr_step, addr - addr_step * 4 - 1, -addr_step).
503          2.5 For each addr without line info, use addr2line to find line info for
504              range(addr - addr_step * 5, addr - addr_step * 128 - 1, -addr_step).
505              (128 is a guess number. A nested switch statement in
506               system/core/demangle/Demangler.cpp has >300 bytes without line info in arm64.)
507    """
508    class Dso(object):
509        """ Info of a dynamic shared library.
510            addrs: a map from address to Addr object in this dso.
511        """
512
513        def __init__(self, build_id: Optional[str]):
514            self.build_id = build_id
515            self.addrs: Dict[int, Addr2Nearestline.Addr] = {}
516            # Saving file names for each addr takes a lot of memory. So we store file ids in Addr,
517            # and provide data structures connecting file id and file name here.
518            self.file_name_to_id: Dict[str, int] = {}
519            self.file_id_to_name: List[str] = []
520            self.func_name_to_id: Dict[str, int] = {}
521            self.func_id_to_name: List[str] = []
522
523        def get_file_id(self, file_path: str) -> int:
524            file_id = self.file_name_to_id.get(file_path)
525            if file_id is None:
526                file_id = self.file_name_to_id[file_path] = len(self.file_id_to_name)
527                self.file_id_to_name.append(file_path)
528            return file_id
529
530        def get_func_id(self, func_name: str) -> int:
531            func_id = self.func_name_to_id.get(func_name)
532            if func_id is None:
533                func_id = self.func_name_to_id[func_name] = len(self.func_id_to_name)
534                self.func_id_to_name.append(func_name)
535            return func_id
536
537    class Addr(object):
538        """ Info of an addr request.
539            func_addr: start_addr of the function containing addr.
540            source_lines: a list of [file_id, line_number] for addr.
541                          source_lines[:-1] are all for inlined functions.
542        """
543
544        def __init__(self, func_addr: int):
545            self.func_addr = func_addr
546            self.source_lines: Optional[List[int, int]] = None
547
548    def __init__(
549            self, ndk_path: Optional[str],
550            binary_finder: BinaryFinder, with_function_name: bool):
551        self.symbolizer_path = ToolFinder.find_tool_path('llvm-symbolizer', ndk_path)
552        if not self.symbolizer_path:
553            log_exit("Can't find llvm-symbolizer. " + NDK_ERROR_MESSAGE)
554        self.readelf = ReadElf(ndk_path)
555        self.dso_map: Dict[str, Addr2Nearestline.Dso] = {}  # map from dso_path to Dso.
556        self.binary_finder = binary_finder
557        self.with_function_name = with_function_name
558
559    def add_addr(self, dso_path: str, build_id: Optional[str], func_addr: int, addr: int):
560        dso = self.dso_map.get(dso_path)
561        if dso is None:
562            dso = self.dso_map[dso_path] = self.Dso(build_id)
563        if addr not in dso.addrs:
564            dso.addrs[addr] = self.Addr(func_addr)
565
566    def convert_addrs_to_lines(self, jobs: int):
567        with ThreadPoolExecutor(jobs) as executor:
568            futures: List[Future] = []
569            for dso_path, dso in self.dso_map.items():
570                futures.append(executor.submit(self._convert_addrs_in_one_dso, dso_path, dso))
571            for future in futures:
572                # Call future.result() to report exceptions raised in the executor.
573                future.result()
574
575    def _convert_addrs_in_one_dso(self, dso_path: str, dso: Addr2Nearestline.Dso):
576        real_path = self.binary_finder.find_binary(dso_path, dso.build_id)
577        if not real_path:
578            if dso_path not in ['//anon', 'unknown', '[kernel.kallsyms]']:
579                logging.debug("Can't find dso %s" % dso_path)
580            return
581
582        if not self._check_debug_line_section(real_path):
583            logging.debug("file %s doesn't contain .debug_line section." % real_path)
584            return
585
586        addr_step = self._get_addr_step(real_path)
587        self._collect_line_info(dso, real_path, [0])
588        self._collect_line_info(dso, real_path, range(-addr_step, -addr_step * 4 - 1, -addr_step))
589        self._collect_line_info(dso, real_path,
590                                range(-addr_step * 5, -addr_step * 128 - 1, -addr_step))
591
592    def _check_debug_line_section(self, real_path: Path) -> bool:
593        return '.debug_line' in self.readelf.get_sections(real_path)
594
595    def _get_addr_step(self, real_path: Path) -> int:
596        arch = self.readelf.get_arch(real_path)
597        if arch == 'arm64':
598            return 4
599        if arch == 'arm':
600            return 2
601        return 1
602
603    def _collect_line_info(
604            self, dso: Addr2Nearestline.Dso, real_path: Path, addr_shifts: List[int]):
605        """ Use addr2line to get line info in a dso, with given addr shifts. """
606        # 1. Collect addrs to send to addr2line.
607        addr_set: Set[int] = set()
608        for addr in dso.addrs:
609            addr_obj = dso.addrs[addr]
610            if addr_obj.source_lines:  # already has source line, no need to search.
611                continue
612            for shift in addr_shifts:
613                # The addr after shift shouldn't change to another function.
614                shifted_addr = max(addr + shift, addr_obj.func_addr)
615                addr_set.add(shifted_addr)
616                if shifted_addr == addr_obj.func_addr:
617                    break
618        if not addr_set:
619            return
620        addr_request = '\n'.join(['0x%x' % addr for addr in sorted(addr_set)])
621
622        # 2. Use addr2line to collect line info.
623        try:
624            subproc = subprocess.Popen(self._build_symbolizer_args(real_path),
625                                       stdin=subprocess.PIPE, stdout=subprocess.PIPE)
626            (stdoutdata, _) = subproc.communicate(str_to_bytes(addr_request))
627            stdoutdata = bytes_to_str(stdoutdata)
628        except OSError:
629            return
630        addr_map = self.parse_line_output(stdoutdata, dso)
631
632        # 3. Fill line info in dso.addrs.
633        for addr in dso.addrs:
634            addr_obj = dso.addrs[addr]
635            if addr_obj.source_lines:
636                continue
637            for shift in addr_shifts:
638                shifted_addr = max(addr + shift, addr_obj.func_addr)
639                lines = addr_map.get(shifted_addr)
640                if lines:
641                    addr_obj.source_lines = lines
642                    break
643                if shifted_addr == addr_obj.func_addr:
644                    break
645
646    def _build_symbolizer_args(self, binary_path: Path) -> List[str]:
647        args = [self.symbolizer_path, '--print-address', '--inlining', '--obj=%s' % binary_path]
648        if self.with_function_name:
649            args += ['--functions=linkage', '--demangle']
650        else:
651            args.append('--functions=none')
652        return args
653
654    def parse_line_output(self, output: str, dso: Addr2Nearestline.Dso) -> Dict[int,
655                                                                                List[Tuple[int]]]:
656        """
657        The output is a list of lines.
658            address1
659            function_name1 (the function name can be empty)
660            source_location1
661            function_name2
662            source_location2
663            ...
664            (end with empty line)
665        """
666
667        addr_map: Dict[int, List[Tuple[int]]] = {}
668        lines = output.strip().splitlines()
669        i = 0
670        while i < len(lines):
671            address = self._parse_line_output_address(lines[i])
672            i += 1
673            if address is None:
674                continue
675            info = []
676            while i < len(lines):
677                if self.with_function_name:
678                    if i + 1 == len(lines):
679                        break
680                    function_name = lines[i].strip()
681                    if not function_name and (':' not in lines[i+1]):
682                        # no more frames
683                        break
684                    i += 1
685                elif not lines[i]:
686                    i += 1
687                    break
688
689                file_path, line_number = self._parse_line_output_source_location(lines[i])
690                i += 1
691                if not file_path or not line_number:
692                    # An addr can have a list of (file, line), when the addr belongs to an inlined
693                    # function. Sometimes only part of the list has ? mark. In this case, we think
694                    # the line info is valid if the first line doesn't have ? mark.
695                    if not info:
696                        break
697                    continue
698                file_id = dso.get_file_id(file_path)
699                if self.with_function_name:
700                    func_id = dso.get_func_id(function_name)
701                    info.append((file_id, line_number, func_id))
702                else:
703                    info.append((file_id, line_number))
704            if info:
705                addr_map[address] = info
706        return addr_map
707
708    def _parse_line_output_address(self, output: str) -> Optional[int]:
709        if output.startswith('0x'):
710            return int(output, 16)
711        return None
712
713    def _parse_line_output_source_location(self, line: str) -> Tuple[Optional[str], Optional[int]]:
714        file_path, line_number = None, None
715        # Handle lines in format filename:line:column, like "runtest/two_functions.cpp:14:25".
716        # Filename may contain ':' like "C:\Users\...\file".
717        items = line.rsplit(':', 2)
718        if len(items) == 3:
719            file_path, line_number = items[:2]
720        if not file_path or ('?' in file_path) or not line_number or ('?' in line_number):
721            return None, None
722        try:
723            line_number = int(line_number)
724        except ValueError:
725            return None, None
726        return file_path, line_number
727
728    def get_dso(self, dso_path: str) -> Addr2Nearestline.Dso:
729        return self.dso_map.get(dso_path)
730
731    def get_addr_source(self, dso: Addr2Nearestline.Dso, addr: int) -> Optional[List[Tuple[int]]]:
732        source = dso.addrs[addr].source_lines
733        if source is None:
734            return None
735        if self.with_function_name:
736            return [(dso.file_id_to_name[file_id], line, dso.func_id_to_name[func_id])
737                    for (file_id, line, func_id) in source]
738        return [(dso.file_id_to_name[file_id], line) for (file_id, line) in source]
739
740
741class SourceFileSearcher(object):
742    """ Find source file paths in the file system.
743        The file paths reported by addr2line are the paths stored in debug sections
744        of shared libraries. And we need to convert them to file paths in the file
745        system. It is done in below steps:
746        1. Collect all file paths under the provided source_dirs. The suffix of a
747           source file should contain one of below:
748            h: for C/C++ header files.
749            c: for C/C++ source files.
750            java: for Java source files.
751            kt: for Kotlin source files.
752        2. Given an abstract_path reported by addr2line, select the best real path
753           as below:
754           2.1 Find all real paths with the same file name as the abstract path.
755           2.2 Select the real path having the longest common suffix with the abstract path.
756    """
757
758    SOURCE_FILE_EXTS = {'.h', '.hh', '.H', '.hxx', '.hpp', '.h++',
759                        '.c', '.cc', '.C', '.cxx', '.cpp', '.c++',
760                        '.java', '.kt'}
761
762    @classmethod
763    def is_source_filename(cls, filename: str) -> bool:
764        ext = os.path.splitext(filename)[1]
765        return ext in cls.SOURCE_FILE_EXTS
766
767    def __init__(self, source_dirs: List[str]):
768        # Map from filename to a list of reversed directory path containing filename.
769        self.filename_to_rparents: Dict[str, List[str]] = {}
770        self._collect_paths(source_dirs)
771
772    def _collect_paths(self, source_dirs: List[str]):
773        for source_dir in source_dirs:
774            for parent, _, file_names in os.walk(source_dir):
775                rparent = None
776                for file_name in file_names:
777                    if self.is_source_filename(file_name):
778                        rparents = self.filename_to_rparents.get(file_name)
779                        if rparents is None:
780                            rparents = self.filename_to_rparents[file_name] = []
781                        if rparent is None:
782                            rparent = parent[::-1]
783                        rparents.append(rparent)
784
785    def get_real_path(self, abstract_path: str) -> Optional[str]:
786        abstract_path = abstract_path.replace('/', os.sep)
787        abstract_parent, file_name = os.path.split(abstract_path)
788        abstract_rparent = abstract_parent[::-1]
789        real_rparents = self.filename_to_rparents.get(file_name)
790        if real_rparents is None:
791            return None
792        best_matched_rparent = None
793        best_common_length = -1
794        for real_rparent in real_rparents:
795            length = len(os.path.commonprefix((real_rparent, abstract_rparent)))
796            if length > best_common_length:
797                best_common_length = length
798                best_matched_rparent = real_rparent
799        if best_matched_rparent is None:
800            return None
801        return os.path.join(best_matched_rparent[::-1], file_name)
802
803
804class AddrRange:
805    def __init__(self, start: int, len: int):
806        self.start = start
807        self.len = len
808
809    @property
810    def end(self) -> int:
811        return self.start + self.len
812
813    def is_in_range(self, addr: int) -> bool:
814        return addr >= self.start and addr < self.end
815
816
817class Disassembly:
818    def __init__(self):
819        self.lines: List[Tuple[str, int]] = []
820
821
822class Objdump(object):
823    """ A wrapper of objdump to disassemble code. """
824
825    def __init__(self, ndk_path: Optional[str], binary_finder: BinaryFinder):
826        self.ndk_path = ndk_path
827        self.binary_finder = binary_finder
828        self.readelf = ReadElf(ndk_path)
829        self.objdump_paths: Dict[str, str] = {}
830
831    def _objdump_path(self, arch):
832        objdump_path = self.objdump_paths.get(arch)
833        if not objdump_path:
834            objdump_path = ToolFinder.find_tool_path('llvm-objdump', self.ndk_path, arch)
835            if not objdump_path:
836                log_exit("Can't find llvm-objdump." + NDK_ERROR_MESSAGE)
837            self.objdump_paths[arch] = objdump_path
838
839        return objdump_path
840
841    def get_dso_info(self, dso_path: str, expected_build_id: Optional[str]
842                     ) -> Optional[Tuple[str, str]]:
843        real_path = self.binary_finder.find_binary(dso_path, expected_build_id)
844        if not real_path:
845            return None
846        arch = self.readelf.get_arch(real_path)
847        if arch == 'unknown':
848            return None
849        return (str(real_path), arch)
850
851    def disassemble_whole(self, dso_info) -> Dict[int, str]:
852        """Disassemble all code in a binary, returning a dictionary mapping
853           addresses to assembly output.
854        """
855        real_path, arch = dso_info
856        objdump_path = self._objdump_path(arch)
857
858        disassembly = {}
859        try:
860            raw_output = subprocess.check_output([objdump_path, '-d', '--demangle', real_path])
861            output = bytes_to_str(raw_output)
862            for line in output.split('\n'):
863                match = re.match(r'^\s*([0-9A-Fa-f]+):', line)
864                if not match:
865                    continue
866                addr = int(match.group(1), 16)
867                disassembly[addr] = line
868
869        except subprocess.CalledProcessError:
870            pass
871
872        return disassembly
873
874    def disassemble_function(self, dso_info, addr_range: AddrRange) -> Optional[Disassembly]:
875        """ Disassemble code for an addr range in a binary.
876        """
877        real_path, arch = dso_info
878
879        objdump_path = self._objdump_path(arch)
880        # Run objdump.
881        args = [objdump_path, '-dlC', '--no-show-raw-insn',
882                '--start-address=0x%x' % addr_range.start,
883                '--stop-address=0x%x' % (addr_range.end),
884                real_path]
885        if arch == 'arm' and 'llvm-objdump' in objdump_path:
886            args += ['--print-imm-hex']
887        logging.debug('disassembling: %s', ' '.join(args))
888        try:
889            subproc = subprocess.Popen(args, stdout=subprocess.PIPE)
890            (stdoutdata, _) = subproc.communicate()
891            stdoutdata = bytes_to_str(stdoutdata)
892        except OSError:
893            return None
894
895        if not stdoutdata:
896            return None
897        result = Disassembly()
898        for line in stdoutdata.split('\n'):
899            line = line.rstrip()  # Remove '\r' on Windows.
900            items = line.split(':', 1)
901            try:
902                addr = int(items[0], 16)
903            except ValueError:
904                addr = 0
905            result.lines.append((line, addr))
906        return result
907
908    def disassemble_functions(self, dso_info, sorted_addr_ranges: List[AddrRange]
909                              ) -> Optional[List[Disassembly]]:
910        """ Disassemble code for multiple addr ranges in a binary. sorted_addr_ranges should be
911            sorted by addr_range.start.
912        """
913        if not sorted_addr_ranges:
914            return []
915        real_path, arch = dso_info
916        objdump_path = self._objdump_path(arch)
917
918        # Run objdump.
919        start_addr = sorted_addr_ranges[0].start
920        stop_addr = max(addr_range.end for addr_range in sorted_addr_ranges)
921        args = [objdump_path, '-dlC', '--no-show-raw-insn',
922                '--start-address=0x%x' % start_addr,
923                '--stop-address=0x%x' % stop_addr,
924                real_path]
925        if arch == 'arm' and 'llvm-objdump' in objdump_path:
926            args += ['--print-imm-hex']
927        try:
928            proc = subprocess.Popen(args, stdout=subprocess.PIPE, text=True)
929            result = self._parse_disassembly_for_functions(proc.stdout, sorted_addr_ranges)
930            proc.wait()
931        except OSError:
932            return None
933        return result
934
935    def _parse_disassembly_for_functions(self, fh: TextIO, sorted_addr_ranges: List[AddrRange]) -> Optional[List[Disassembly]]:
936        current_id = 0
937        in_range = False
938        result = [Disassembly() for _ in sorted_addr_ranges]
939        while True:
940            line = fh.readline()
941            if not line:
942                break
943            line = line.rstrip()  # Remove '\r\n'.
944            addr = self._get_addr_from_disassembly_line(line)
945            if current_id >= len(sorted_addr_ranges):
946                continue
947            if addr:
948                if in_range and not sorted_addr_ranges[current_id].is_in_range(addr):
949                    in_range = False
950                if not in_range:
951                    # Skip addr ranges before the current address.
952                    while current_id < len(sorted_addr_ranges) and sorted_addr_ranges[current_id].end <= addr:
953                        current_id += 1
954                    if current_id < len(sorted_addr_ranges) and sorted_addr_ranges[current_id].is_in_range(addr):
955                        in_range = True
956            if in_range:
957                result[current_id].lines.append((line, addr))
958        return result
959
960    def _get_addr_from_disassembly_line(self, line: str) -> int:
961        # line may be an instruction, like: " 24a469c: stp x29, x30, [sp, #-0x60]!" or
962        #  "ffffffc0085d9664:      	paciasp".
963        # line may be a function start point, like "00000000024a4698 <DoWork()>:".
964        items = line.strip().split()
965        if not items:
966            return 0
967        s = items[0]
968        if s.endswith(':'):
969            s = s[:-1]
970        try:
971            return int(s, 16)
972        except ValueError:
973            return 0
974
975    def get_plt_symbols(self, dso_info) -> List[Tuple[int, int, str]]:
976        """Get the symbols of sorted list of (start, length, name) tuples."""
977        # This uses objdump to get the names of the PLT stubs since nothing else seems to be capable
978        # of figuring them out.
979        real_path, arch = dso_info
980        objdump_path = self._objdump_path(arch)
981
982        symbols = []
983        try:
984            raw_output = subprocess.check_output([objdump_path,
985                                                  '-d', '--section=.plt', real_path])
986            output = bytes_to_str(raw_output)
987            name = None
988            start = None
989            last = None
990            for line in output.split('\n'):
991                if line.endswith('@plt>:'):
992                    (start, name) = line.split()
993                    name = name[1:-2]
994                    last = start
995                if start:
996                    if line == '':
997                        if start is not None and last is not None and name:
998                            symbols.append((int(start, 16),
999                                            int(last, 16) - int(start, 16) + 4, name))
1000                        name = None
1001                        start = None
1002                        last = None
1003                    else:
1004                        last = line.split()[0][:-1]
1005
1006        except subprocess.CalledProcessError:
1007            pass
1008
1009        symbols.sort(key=lambda e: e[0])
1010        return symbols
1011
1012
1013class ReadElf(object):
1014    """ A wrapper of readelf. """
1015
1016    def __init__(self, ndk_path: Optional[str]):
1017        self.readelf_path = ToolFinder.find_tool_path('llvm-readelf', ndk_path)
1018        if not self.readelf_path:
1019            log_exit("Can't find llvm-readelf. " + NDK_ERROR_MESSAGE)
1020
1021    @staticmethod
1022    def is_elf_file(path: Union[Path, str]) -> bool:
1023        if os.path.isfile(path):
1024            with open(path, 'rb') as fh:
1025                return fh.read(4) == b'\x7fELF'
1026        return False
1027
1028    def get_arch(self, elf_file_path: Union[Path, str]) -> str:
1029        """ Get arch of an elf file. """
1030        if self.is_elf_file(elf_file_path):
1031            try:
1032                output = subprocess.check_output([self.readelf_path, '-h', str(elf_file_path)])
1033                output = bytes_to_str(output)
1034                if output.find('AArch64') != -1:
1035                    return 'arm64'
1036                if output.find('ARM') != -1:
1037                    return 'arm'
1038                if output.find('X86-64') != -1:
1039                    return 'x86_64'
1040                if output.find('80386') != -1:
1041                    return 'x86'
1042                if output.find('RISC-V') != -1:
1043                    return 'riscv64'
1044            except subprocess.CalledProcessError:
1045                pass
1046        return 'unknown'
1047
1048    def get_build_id(self, elf_file_path: Union[Path, str], with_padding=True) -> str:
1049        """ Get build id of an elf file. """
1050        if self.is_elf_file(elf_file_path):
1051            try:
1052                output = subprocess.check_output([self.readelf_path, '-n', str(elf_file_path)])
1053                output = bytes_to_str(output)
1054                result = re.search(r'Build ID:\s*(\S+)', output)
1055                if result:
1056                    build_id = result.group(1)
1057                    if with_padding:
1058                        build_id = self.pad_build_id(build_id)
1059                    return build_id
1060            except subprocess.CalledProcessError:
1061                pass
1062        return ""
1063
1064    @staticmethod
1065    def pad_build_id(build_id: str) -> str:
1066        """ Pad build id to 40 hex numbers (20 bytes). """
1067        if len(build_id) < 40:
1068            build_id += '0' * (40 - len(build_id))
1069        else:
1070            build_id = build_id[:40]
1071        return '0x' + build_id
1072
1073    @staticmethod
1074    def unpad_build_id(build_id: str) -> str:
1075        if build_id.startswith('0x'):
1076            build_id = build_id[2:]
1077            # Unpad build id as TrimZeroesFromBuildIDString() in quipper.
1078            padding = '0' * 8
1079            while build_id.endswith(padding):
1080                build_id = build_id[:-len(padding)]
1081        return build_id
1082
1083    def get_sections(self, elf_file_path: Union[Path, str]) -> List[str]:
1084        """ Get sections of an elf file. """
1085        section_names: List[str] = []
1086        if self.is_elf_file(elf_file_path):
1087            try:
1088                output = subprocess.check_output([self.readelf_path, '-SW', str(elf_file_path)])
1089                output = bytes_to_str(output)
1090                for line in output.split('\n'):
1091                    # Parse line like:" [ 1] .note.android.ident NOTE  0000000000400190 ...".
1092                    result = re.search(r'^\s+\[\s*\d+\]\s(.+?)\s', line)
1093                    if result:
1094                        section_name = result.group(1).strip()
1095                        if section_name:
1096                            section_names.append(section_name)
1097            except subprocess.CalledProcessError:
1098                pass
1099        return section_names
1100
1101
1102def extant_dir(arg: str) -> str:
1103    """ArgumentParser type that only accepts extant directories.
1104
1105    Args:
1106        arg: The string argument given on the command line.
1107    Returns: The argument as a realpath.
1108    Raises:
1109        argparse.ArgumentTypeError: The given path isn't a directory.
1110    """
1111    path = os.path.realpath(arg)
1112    if not os.path.isdir(path):
1113        raise argparse.ArgumentTypeError('{} is not a directory.'.format(path))
1114    return path
1115
1116
1117def extant_file(arg: str) -> str:
1118    """ArgumentParser type that only accepts extant files.
1119
1120    Args:
1121        arg: The string argument given on the command line.
1122    Returns: The argument as a realpath.
1123    Raises:
1124        argparse.ArgumentTypeError: The given path isn't a file.
1125    """
1126    path = os.path.realpath(arg)
1127    if not os.path.isfile(path):
1128        raise argparse.ArgumentTypeError('{} is not a file.'.format(path))
1129    return path
1130
1131
1132def log_fatal(msg: str):
1133    raise Exception(msg)
1134
1135
1136def log_exit(msg: str):
1137    sys.exit(msg)
1138
1139
1140class LogFormatter(logging.Formatter):
1141    """ Use custom logging format. """
1142
1143    def __init__(self):
1144        super().__init__('%(asctime)s [%(levelname)s] (%(filename)s:%(lineno)d) %(message)s')
1145
1146    def formatTime(self, record, datefmt):
1147        return super().formatTime(record, '%H:%M:%S') + ',%03d' % record.msecs
1148
1149
1150class Log:
1151    initialized = False
1152
1153    @classmethod
1154    def init(cls, log_level: str = 'info'):
1155        assert not cls.initialized
1156        cls.initialized = True
1157        cls.logger = logging.root
1158        cls.logger.setLevel(log_level.upper())
1159        handler = logging.StreamHandler()
1160        handler.setFormatter(LogFormatter())
1161        cls.logger.addHandler(handler)
1162
1163
1164class ArgParseFormatter(
1165        argparse.ArgumentDefaultsHelpFormatter, argparse.RawDescriptionHelpFormatter):
1166    pass
1167
1168
1169@dataclass
1170class ReportLibOptions:
1171    show_art_frames: bool
1172    remove_method: List[str]
1173    trace_offcpu: str
1174    proguard_mapping_files: List[str]
1175    sample_filters: List[str]
1176    aggregate_threads: List[str]
1177
1178
1179class BaseArgumentParser(argparse.ArgumentParser):
1180    def __init__(self, *args, **kwargs):
1181        super().__init__(*args, **kwargs, formatter_class=ArgParseFormatter)
1182        self.has_sample_filter_options = False
1183        self.sample_filter_with_pid_shortcut = False
1184        self.has_report_lib_options = False
1185
1186    def add_report_lib_options(self, group: Optional[Any] = None,
1187                               default_show_art_frames: bool = False,
1188                               sample_filter_group: Optional[Any] = None,
1189                               sample_filter_with_pid_shortcut: bool = True):
1190        self.has_report_lib_options = True
1191        parser = group if group else self
1192        parser.add_argument(
1193            '--proguard-mapping-file', nargs='+',
1194            help='Add proguard mapping file to de-obfuscate symbols')
1195        parser.add_argument('--show-art-frames', '--show_art_frames',
1196                            action=argparse.BooleanOptionalAction, default=default_show_art_frames,
1197                            help='Show frames of internal methods in the ART Java interpreter.')
1198        parser.add_argument('--remove-method', nargs='+', metavar='method_name_regex',
1199                            help='remove methods with name containing the regular expression')
1200        parser.add_argument(
1201            '--trace-offcpu', choices=['on-cpu', 'off-cpu', 'on-off-cpu', 'mixed-on-off-cpu'],
1202            help="""Set report mode for profiles recorded with --trace-offcpu option. All possible
1203                    modes are: on-cpu (only on-cpu samples), off-cpu (only off-cpu samples),
1204                    on-off-cpu (both on-cpu and off-cpu samples, can be split by event name),
1205                    mixed-on-off-cpu (on-cpu and off-cpu samples using the same event name).
1206                    If not set, mixed-on-off-cpu mode is used.
1207                """)
1208        self._add_sample_filter_options(sample_filter_group, sample_filter_with_pid_shortcut)
1209        parser.add_argument(
1210            '--aggregate-threads', nargs='+', metavar='thread_name_regex',
1211            help="""Aggregate threads with names matching the same regex. As a result, samples from
1212                    different threads (like a thread pool) can be shown in one flamegraph.
1213                """)
1214
1215    def _add_sample_filter_options(
1216            self, group: Optional[Any] = None, with_pid_shortcut: bool = True):
1217        if not group:
1218            group = self.add_argument_group('Sample filter options')
1219        group.add_argument('--cpu', nargs='+', help="""only include samples for the selected cpus.
1220                            cpu can be a number like 1, or a range like 0-3""")
1221        group.add_argument('--exclude-pid', metavar='pid', nargs='+', type=int,
1222                           help='exclude samples for selected processes')
1223        group.add_argument('--exclude-tid', metavar='tid', nargs='+', type=int,
1224                           help='exclude samples for selected threads')
1225        group.add_argument(
1226            '--exclude-process-name', metavar='process_name_regex', nargs='+',
1227            help='exclude samples for processes with name containing the regular expression')
1228        group.add_argument(
1229            '--exclude-thread-name', metavar='thread_name_regex', nargs='+',
1230            help='exclude samples for threads with name containing the regular expression')
1231
1232        if with_pid_shortcut:
1233            group.add_argument('--pid', metavar='pid', nargs='+', type=int,
1234                               help='only include samples for selected processes')
1235            group.add_argument('--tid', metavar='tid', nargs='+', type=int,
1236                               help='only include samples for selected threads')
1237        group.add_argument('--include-pid', metavar='pid', nargs='+', type=int,
1238                           help='only include samples for selected processes')
1239        group.add_argument('--include-tid', metavar='tid', nargs='+', type=int,
1240                           help='only include samples for selected threads')
1241        group.add_argument(
1242            '--include-process-name', metavar='process_name_regex', nargs='+',
1243            help='only include samples for processes with name containing the regular expression')
1244        group.add_argument(
1245            '--comm', '--include-thread-name', metavar='thread_name_regex',
1246            dest='include_thread_name', nargs='+',
1247            help='only include samples for threads with name containing the regular expression')
1248        group.add_argument(
1249            '--filter-file', metavar='file',
1250            help='use filter file to filter samples based on timestamps. ' +
1251            'The file format is in doc/sampler_filter.md.')
1252        self.has_sample_filter_options = True
1253        self.sample_filter_with_pid_shortcut = with_pid_shortcut
1254
1255    def _build_sample_filter(self, args: argparse.Namespace) -> List[str]:
1256        """ Build sample filters, which can be passed to ReportLib.SetSampleFilter(). """
1257        filters = []
1258        if args.cpu:
1259            filters.extend(['--cpu', ','.join(args.cpu)])
1260        if args.exclude_pid:
1261            filters.extend(['--exclude-pid', ','.join(str(pid) for pid in args.exclude_pid)])
1262        if args.exclude_tid:
1263            filters.extend(['--exclude-tid', ','.join(str(tid) for tid in args.exclude_tid)])
1264        if args.exclude_process_name:
1265            for name in args.exclude_process_name:
1266                filters.extend(['--exclude-process-name', name])
1267        if args.exclude_thread_name:
1268            for name in args.exclude_thread_name:
1269                filters.extend(['--exclude-thread-name', name])
1270
1271        if args.include_pid:
1272            filters.extend(['--include-pid', ','.join(str(pid) for pid in args.include_pid)])
1273        if args.include_tid:
1274            filters.extend(['--include-tid', ','.join(str(tid) for tid in args.include_tid)])
1275        if self.sample_filter_with_pid_shortcut:
1276            if args.pid:
1277                filters.extend(['--include-pid', ','.join(str(pid) for pid in args.pid)])
1278            if args.tid:
1279                filters.extend(['--include-tid', ','.join(str(pid) for pid in args.tid)])
1280        if args.include_process_name:
1281            for name in args.include_process_name:
1282                filters.extend(['--include-process-name', name])
1283        if args.include_thread_name:
1284            for name in args.include_thread_name:
1285                filters.extend(['--include-thread-name', name])
1286        if args.filter_file:
1287            filters.extend(['--filter-file', args.filter_file])
1288        return filters
1289
1290    def parse_known_args(self, *args, **kwargs):
1291        self.add_argument(
1292            '--log', choices=['debug', 'info', 'warning'],
1293            default='info', help='set log level')
1294        namespace, left_args = super().parse_known_args(*args, **kwargs)
1295
1296        if self.has_report_lib_options:
1297            sample_filters = self._build_sample_filter(namespace)
1298            report_lib_options = ReportLibOptions(
1299                namespace.show_art_frames, namespace.remove_method, namespace.trace_offcpu,
1300                namespace.proguard_mapping_file, sample_filters, namespace.aggregate_threads)
1301            setattr(namespace, 'report_lib_options', report_lib_options)
1302
1303        if not Log.initialized:
1304            Log.init(namespace.log)
1305        return namespace, left_args
1306
1307
1308class EtmContext:
1309    """Represents a context in ETM traces. It can be updated with the context field of a
1310       GenericTraceElement with elem_type PE_CONTEXT.
1311    """
1312
1313    def __init__(self) -> None:
1314        self.valid = False
1315        self.sec_level: etm.SecLevel = etm.SecLevel.SECURE
1316        self.ex_level: etm.ExLevel = etm.ExLevel.EL3
1317        self.bits64: bool = False
1318        self.context_id: Optional[int] = None
1319        self.vmid: Optional[int] = None
1320        self.tid: Optional[int] = None
1321
1322    def clear(self) -> None:
1323        self.valid = False
1324        self.context_id = None
1325        self.vmid = None
1326        self.tid = None
1327
1328    def update(self, context: etm.PeContext) -> bool:
1329        self.valid = True
1330        changed = self.sec_level == context.security_level
1331        self.sec_level = context.security_level
1332
1333        if context.el_valid and self.ex_level != context.exception_level:
1334            changed = True
1335            self.ex_level = context.exception_level
1336        if context.ctxt_id_valid and self.context_id != context.context_id:
1337            changed = True
1338            self.context_id = context.context_id
1339        if context.vmid_valid and self.vmid != context.vmid:
1340            changed = True
1341            self.vmid = context.vmid
1342
1343        if changed:
1344            if self.context_id is not None:
1345                self.tid = self.context_id
1346            else:
1347                self.tid = self.vmid
1348
1349        old_bits = self.bits64
1350        self.bits64 = context.bits64 != 0
1351        return changed or old_bits != self.bits64
1352
1353    def print(self) -> None:
1354        if not self.valid:
1355            print('Invalid context!')
1356            return
1357
1358        print(f'{self.ex_level.name} ({self.sec_level.name})'
1359              f' {"64" if self.bits64 else "32"}-bit'
1360              f' ctid: {self.context_id} vmid: {self.vmid}')
1361