• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Heap visualizer of ASCII characters."""
15
16import argparse
17import sys
18import math
19import logging
20from typing import Optional
21from dataclasses import dataclass, field
22import coloredlogs  # type: ignore
23
24
25@dataclass
26class HeapBlock:
27    """Building blocks for memory chunk allocated at heap."""
28
29    size: int
30    mem_offset: int
31    next: Optional['HeapBlock'] = None
32
33
34@dataclass
35class HeapUsage:
36    """Contains a linked list of allocated HeapBlocks."""
37
38    # Use a default_factory to avoid mutable default value. See
39    # https://docs.python.org/3/library/dataclasses.html#mutable-default-values
40    begin: HeapBlock = field(default_factory=lambda: HeapBlock(0, 0))
41
42    def add_block(self, block):
43        cur_block = self.begin.next
44        prev_block = self.begin
45        while cur_block is not None:
46            if cur_block.mem_offset == block.mem_offset:
47                return
48            if cur_block.mem_offset < block.mem_offset:
49                prev_block = cur_block
50                cur_block = cur_block.next
51            else:
52                block.next = cur_block
53                prev_block.next = block
54                return
55        prev_block.next = block
56
57    def remove_block(self, address):
58        cur_block = self.begin.next
59        prev_block = self.begin
60        while cur_block is not None:
61            if cur_block.mem_offset == address:
62                prev_block.next = cur_block.next
63                return
64            if cur_block.mem_offset < address:
65                prev_block = cur_block
66                cur_block = cur_block.next
67            else:
68                return
69
70
71def add_parser_arguments(parser):
72    """Parse args."""
73    parser.add_argument(
74        '--dump-file',
75        help=(
76            'dump file that contains a list of malloc and '
77            'free instructions. The format should be as '
78            'follows: "m <size> <address>" on a line for '
79            'each malloc called and "f <address>" on a line '
80            'for each free called.'
81        ),
82        required=True,
83    )
84
85    parser.add_argument(
86        '--heap-low-address',
87        help=('lower address of the heap.'),
88        type=lambda x: int(x, 0),
89        required=True,
90    )
91
92    parser.add_argument(
93        '--heap-high-address',
94        help=('higher address of the heap.'),
95        type=lambda x: int(x, 0),
96        required=True,
97    )
98
99    parser.add_argument(
100        '--poison-enabled',
101        help=('if heap poison is enabled or not.'),
102        default=False,
103        action='store_true',
104    )
105
106    parser.add_argument(
107        '--pointer-size',
108        help=('size of pointer on the machine.'),
109        default=4,
110        type=lambda x: int(x, 0),
111    )
112
113
114_LEFT_HEADER_CHAR = '['
115_RIGHT_HEADER_CHAR = ']'
116_USED_CHAR = '*'
117_FREE_CHAR = ' '
118_CHARACTERS_PER_LINE = 64
119_BYTES_PER_CHARACTER = 4
120_LOG = logging.getLogger(__name__)
121
122
123def _exit_due_to_file_not_found():
124    _LOG.critical(
125        'Dump file location is not provided or dump file is not '
126        'found. Please specify a valid file in the argument.'
127    )
128    sys.exit(1)
129
130
131def _exit_due_to_bad_heap_info():
132    _LOG.critical(
133        'Heap low/high address is missing or invalid. Please put valid '
134        'addresses in the argument.'
135    )
136    sys.exit(1)
137
138
139def visualize(
140    dump_file=None,
141    heap_low_address=None,
142    heap_high_address=None,
143    poison_enabled=False,
144    pointer_size=4,
145):
146    """Visualization of heap usage."""
147    # TODO(b/235282507): Add standarized mechanisms to produce dump file and
148    # read heap information from dump file.
149    aligned_bytes = pointer_size
150    header_size = pointer_size * 2
151
152    try:
153        if heap_high_address < heap_low_address:
154            _exit_due_to_bad_heap_info()
155        heap_size = heap_high_address - heap_low_address
156    except TypeError:
157        _exit_due_to_bad_heap_info()
158
159    if poison_enabled:
160        poison_offset = pointer_size
161    else:
162        poison_offset = 0
163
164    try:
165        allocation_dump = open(dump_file, 'r')
166    except (FileNotFoundError, TypeError):
167        _exit_due_to_file_not_found()
168
169    heap_visualizer = HeapUsage()
170    # Parse the dump file.
171    for line in allocation_dump:
172        info = line[:-1].split(' ')
173        if info[0] == 'm':
174            # Add a HeapBlock when malloc is called
175            block = HeapBlock(
176                int(math.ceil(float(info[1]) / aligned_bytes)) * aligned_bytes,
177                int(info[2], 0) - heap_low_address,
178            )
179            heap_visualizer.add_block(block)
180        elif info[0] == 'f':
181            # Remove the HeapBlock when free is called
182            heap_visualizer.remove_block(int(info[1], 0) - heap_low_address)
183
184    # next_block indicates the nearest HeapBlock that hasn't finished
185    # printing.
186    next_block = heap_visualizer.begin
187    if next_block.next is None:
188        next_mem_offset = heap_size + header_size + poison_offset + 1
189        next_size = 0
190    else:
191        next_mem_offset = next_block.next.mem_offset
192        next_size = next_block.next.size
193
194    # Flags to indicate status of the 4 bytes going to be printed.
195    is_left_header = False
196    is_right_header = False
197    is_used = False
198
199    # Print overall heap information
200    _LOG.info(
201        '%-40s%-40s',
202        f'The heap starts at {hex(heap_low_address)}.',
203        f'The heap ends at {hex(heap_high_address)}.',
204    )
205    _LOG.info(
206        '%-40s%-40s',
207        f'Heap size is {heap_size // 1024}k bytes.',
208        f'Heap is aligned by {aligned_bytes} bytes.',
209    )
210    if poison_offset != 0:
211        _LOG.info(
212            'Poison is enabled %d bytes before and after the usable '
213            'space of each block.',
214            poison_offset,
215        )
216    else:
217        _LOG.info('%-40s', 'Poison is disabled.')
218    _LOG.info(
219        '%-40s',
220        'Below is the visualization of the heap. '
221        'Each character represents 4 bytes.',
222    )
223    _LOG.info('%-40s', f"    '{_FREE_CHAR}' indicates free space.")
224    _LOG.info('%-40s', f"    '{_USED_CHAR}' indicates used space.")
225    _LOG.info(
226        '%-40s',
227        f"    '{_LEFT_HEADER_CHAR}' indicates header or "
228        'poisoned space before the block.',
229    )
230    _LOG.info(
231        '%-40s',
232        f"    '{_RIGHT_HEADER_CHAR}' poisoned space after " 'the block.',
233    )
234    print()
235
236    # Go over the heap space where there will be 64 characters each line.
237    for line_base_address in range(
238        0, heap_size, _CHARACTERS_PER_LINE * _BYTES_PER_CHARACTER
239    ):
240        # Print the heap address of the current line.
241        sys.stdout.write(
242            f"{' ': <13}"
243            f'{hex(heap_low_address + line_base_address)}'
244            f"{f' (+{line_base_address}):': <12}"
245        )
246        for line_offset in range(
247            0, _CHARACTERS_PER_LINE * _BYTES_PER_CHARACTER, _BYTES_PER_CHARACTER
248        ):
249            # Determine if the current 4 bytes is used, unused, or is a
250            # header.
251            # The case that we have went over the previous block and will
252            # turn to the next block.
253            current_address = line_base_address + line_offset
254            if current_address == next_mem_offset + next_size + poison_offset:
255                next_block = next_block.next
256                # If this is the last block, set nextMemOffset to be over
257                # the last byte of heap so that the rest of the heap will
258                # be printed out as unused.
259                # Otherwise set the next HeapBlock allocated.
260                if next_block.next is None:
261                    next_mem_offset = (
262                        heap_size + header_size + poison_offset + 1
263                    )
264                    next_size = 0
265                else:
266                    next_mem_offset = next_block.next.mem_offset
267                    next_size = next_block.next.size
268
269            # Determine the status of the current 4 bytes.
270            if (
271                next_mem_offset - header_size - poison_offset
272                <= current_address
273                < next_mem_offset
274            ):
275                is_left_header = True
276                is_right_header = False
277                is_used = False
278            elif (
279                next_mem_offset <= current_address < next_mem_offset + next_size
280            ):
281                is_left_header = False
282                is_right_header = False
283                is_used = True
284            elif (
285                next_mem_offset + next_size
286                <= current_address
287                < next_mem_offset + next_size + poison_offset
288            ):
289                is_left_header = False
290                is_right_header = True
291                is_used = False
292            else:
293                is_left_header = False
294                is_right_header = False
295                is_used = False
296
297            if is_left_header:
298                sys.stdout.write(_LEFT_HEADER_CHAR)
299            elif is_right_header:
300                sys.stdout.write(_RIGHT_HEADER_CHAR)
301            elif is_used:
302                sys.stdout.write(_USED_CHAR)
303            else:
304                sys.stdout.write(_FREE_CHAR)
305        sys.stdout.write('\n')
306
307    allocation_dump.close()
308
309
310def main():
311    """A python script to visualize heap usage given a dump file."""
312    parser = argparse.ArgumentParser(description=main.__doc__)
313    add_parser_arguments(parser)
314    # Try to use pw_cli logs, else default to something reasonable.
315    try:
316        import pw_cli.log  # pylint: disable=import-outside-toplevel
317
318        pw_cli.log.install()
319    except ImportError:
320        coloredlogs.install(
321            level='INFO',
322            level_styles={'debug': {'color': 244}, 'error': {'color': 'red'}},
323            fmt='%(asctime)s %(levelname)s | %(message)s',
324        )
325    visualize(**vars(parser.parse_args()))
326
327
328if __name__ == "__main__":
329    main()
330