• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2014 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import collections
6import datetime
7import logging
8import multiprocessing
9import os
10import posixpath
11import Queue
12import re
13import subprocess
14import sys
15import threading
16import time
17
18
19# addr2line builds a possibly infinite memory cache that can exhaust
20# the computer's memory if allowed to grow for too long. This constant
21# controls how many lookups we do before restarting the process. 4000
22# gives near peak performance without extreme memory usage.
23ADDR2LINE_RECYCLE_LIMIT = 4000
24
25
26class ELFSymbolizer(object):
27  """An uber-fast (multiprocessing, pipelined and asynchronous) ELF symbolizer.
28
29  This class is a frontend for addr2line (part of GNU binutils), designed to
30  symbolize batches of large numbers of symbols for a given ELF file. It
31  supports sharding symbolization against many addr2line instances and
32  pipelining of multiple requests per each instance (in order to hide addr2line
33  internals and OS pipe latencies).
34
35  The interface exhibited by this class is a very simple asynchronous interface,
36  which is based on the following three methods:
37  - SymbolizeAsync(): used to request (enqueue) resolution of a given address.
38  - The |callback| method: used to communicated back the symbol information.
39  - Join(): called to conclude the batch to gather the last outstanding results.
40  In essence, before the Join method returns, this class will have issued as
41  many callbacks as the number of SymbolizeAsync() calls. In this regard, note
42  that due to multiprocess sharding, callbacks can be delivered out of order.
43
44  Some background about addr2line:
45  - it is invoked passing the elf path in the cmdline, piping the addresses in
46    its stdin and getting results on its stdout.
47  - it has pretty large response times for the first requests, but it
48    works very well in streaming mode once it has been warmed up.
49  - it doesn't scale by itself (on more cores). However, spawning multiple
50    instances at the same time on the same file is pretty efficient as they
51    keep hitting the pagecache and become mostly CPU bound.
52  - it might hang or crash, mostly for OOM. This class deals with both of these
53    problems.
54
55  Despite the "scary" imports and the multi* words above, (almost) no multi-
56  threading/processing is involved from the python viewpoint. Concurrency
57  here is achieved by spawning several addr2line subprocesses and handling their
58  output pipes asynchronously. Therefore, all the code here (with the exception
59  of the Queue instance in Addr2Line) should be free from mind-blowing
60  thread-safety concerns.
61
62  The multiprocess sharding works as follows:
63  The symbolizer tries to use the lowest number of addr2line instances as
64  possible (with respect of |max_concurrent_jobs|) and enqueue all the requests
65  in a single addr2line instance. For few symbols (i.e. dozens) sharding isn't
66  worth the startup cost.
67  The multiprocess logic kicks in as soon as the queues for the existing
68  instances grow. Specifically, once all the existing instances reach the
69  |max_queue_size| bound, a new addr2line instance is kicked in.
70  In the case of a very eager producer (i.e. all |max_concurrent_jobs| instances
71  have a backlog of |max_queue_size|), back-pressure is applied on the caller by
72  blocking the SymbolizeAsync method.
73
74  This module has been deliberately designed to be dependency free (w.r.t. of
75  other modules in this project), to allow easy reuse in external projects.
76  """
77
78  def __init__(self, elf_file_path, addr2line_path, callback, inlines=False,
79      max_concurrent_jobs=None, addr2line_timeout=30, max_queue_size=50,
80      source_root_path=None, strip_base_path=None):
81    """Args:
82      elf_file_path: path of the elf file to be symbolized.
83      addr2line_path: path of the toolchain's addr2line binary.
84      callback: a callback which will be invoked for each resolved symbol with
85          the two args (sym_info, callback_arg). The former is an instance of
86          |ELFSymbolInfo| and contains the symbol information. The latter is an
87          embedder-provided argument which is passed to SymbolizeAsync().
88      inlines: when True, the ELFSymbolInfo will contain also the details about
89          the outer inlining functions. When False, only the innermost function
90          will be provided.
91      max_concurrent_jobs: Max number of addr2line instances spawned.
92          Parallelize responsibly, addr2line is a memory and I/O monster.
93      max_queue_size: Max number of outstanding requests per addr2line instance.
94      addr2line_timeout: Max time (in seconds) to wait for a addr2line response.
95          After the timeout, the instance will be considered hung and respawned.
96      source_root_path: In some toolchains only the name of the source file is
97          is output, without any path information; disambiguation searches
98          through the source directory specified by |source_root_path| argument
99          for files whose name matches, adding the full path information to the
100          output. For example, if the toolchain outputs "unicode.cc" and there
101          is a file called "unicode.cc" located under |source_root_path|/foo,
102          the tool will replace "unicode.cc" with
103          "|source_root_path|/foo/unicode.cc". If there are multiple files with
104          the same name, disambiguation will fail because the tool cannot
105          determine which of the files was the source of the symbol.
106      strip_base_path: Rebases the symbols source paths onto |source_root_path|
107          (i.e replace |strip_base_path| with |source_root_path).
108    """
109    assert(os.path.isfile(addr2line_path)), 'Cannot find ' + addr2line_path
110    self.elf_file_path = elf_file_path
111    self.addr2line_path = addr2line_path
112    self.callback = callback
113    self.inlines = inlines
114    self.max_concurrent_jobs = (max_concurrent_jobs or
115                                min(multiprocessing.cpu_count(), 4))
116    self.max_queue_size = max_queue_size
117    self.addr2line_timeout = addr2line_timeout
118    self.requests_counter = 0  # For generating monotonic request IDs.
119    self._a2l_instances = []  # Up to |max_concurrent_jobs| _Addr2Line inst.
120
121    # If necessary, create disambiguation lookup table
122    self.disambiguate = source_root_path is not None
123    self.disambiguation_table = {}
124    self.strip_base_path = strip_base_path
125    if self.disambiguate:
126      self.source_root_path = os.path.abspath(source_root_path)
127      self._CreateDisambiguationTable()
128
129    # Create one addr2line instance. More instances will be created on demand
130    # (up to |max_concurrent_jobs|) depending on the rate of the requests.
131    self._CreateNewA2LInstance()
132
133  def SymbolizeAsync(self, addr, callback_arg=None):
134    """Requests symbolization of a given address.
135
136    This method is not guaranteed to return immediately. It generally does, but
137    in some scenarios (e.g. all addr2line instances have full queues) it can
138    block to create back-pressure.
139
140    Args:
141      addr: address to symbolize.
142      callback_arg: optional argument which will be passed to the |callback|."""
143    assert isinstance(addr, int)
144
145    # Process all the symbols that have been resolved in the meanwhile.
146    # Essentially, this drains all the addr2line(s) out queues.
147    for a2l_to_purge in self._a2l_instances:
148      a2l_to_purge.ProcessAllResolvedSymbolsInQueue()
149      a2l_to_purge.RecycleIfNecessary()
150
151    # Find the best instance according to this logic:
152    # 1. Find an existing instance with the shortest queue.
153    # 2. If all of instances' queues are full, but there is room in the pool,
154    #    (i.e. < |max_concurrent_jobs|) create a new instance.
155    # 3. If there were already |max_concurrent_jobs| instances and all of them
156    #    had full queues, make back-pressure.
157
158    # 1.
159    def _SortByQueueSizeAndReqID(a2l):
160      return (a2l.queue_size, a2l.first_request_id)
161    a2l = min(self._a2l_instances, key=_SortByQueueSizeAndReqID)
162
163    # 2.
164    if (a2l.queue_size >= self.max_queue_size and
165        len(self._a2l_instances) < self.max_concurrent_jobs):
166      a2l = self._CreateNewA2LInstance()
167
168    # 3.
169    if a2l.queue_size >= self.max_queue_size:
170      a2l.WaitForNextSymbolInQueue()
171
172    a2l.EnqueueRequest(addr, callback_arg)
173
174  def Join(self):
175    """Waits for all the outstanding requests to complete and terminates."""
176    for a2l in self._a2l_instances:
177      a2l.WaitForIdle()
178      a2l.Terminate()
179
180  def _CreateNewA2LInstance(self):
181    assert len(self._a2l_instances) < self.max_concurrent_jobs
182    a2l = ELFSymbolizer.Addr2Line(self)
183    self._a2l_instances.append(a2l)
184    return a2l
185
186  def _CreateDisambiguationTable(self):
187    """ Non-unique file names will result in None entries"""
188    start_time = time.time()
189    logging.info('Collecting information about available source files...')
190    self.disambiguation_table = {}
191
192    for root, _, filenames in os.walk(self.source_root_path):
193      for f in filenames:
194        self.disambiguation_table[f] = os.path.join(root, f) if (f not in
195                                       self.disambiguation_table) else None
196    logging.info('Finished collecting information about '
197                 'possible files (took %.1f s).',
198                 (time.time() - start_time))
199
200
201  class Addr2Line(object):
202    """A python wrapper around an addr2line instance.
203
204    The communication with the addr2line process looks as follows:
205      [STDIN]         [STDOUT]  (from addr2line's viewpoint)
206    > f001111
207    > f002222
208                    < Symbol::Name(foo, bar) for f001111
209                    < /path/to/source/file.c:line_number
210    > f003333
211                    < Symbol::Name2() for f002222
212                    < /path/to/source/file.c:line_number
213                    < Symbol::Name3() for f003333
214                    < /path/to/source/file.c:line_number
215    """
216
217    SYM_ADDR_RE = re.compile(r'([^:]+):(\?|\d+).*')
218
219    def __init__(self, symbolizer):
220      self._symbolizer = symbolizer
221      self._lib_file_name = posixpath.basename(symbolizer.elf_file_path)
222
223      # The request queue (i.e. addresses pushed to addr2line's stdin and not
224      # yet retrieved on stdout)
225      self._request_queue = collections.deque()
226
227      # This is essentially len(self._request_queue). It has been optimized to a
228      # separate field because turned out to be a perf hot-spot.
229      self.queue_size = 0
230
231      # Keep track of the number of symbols a process has processed to
232      # avoid a single process growing too big and using all the memory.
233      self._processed_symbols_count = 0
234
235      # Objects required to handle the addr2line subprocess.
236      self._proc = None  # Subprocess.Popen(...) instance.
237      self._thread = None  # Threading.thread instance.
238      self._out_queue = None  # Queue.Queue instance (for buffering a2l stdout).
239      self._RestartAddr2LineProcess()
240
241    def EnqueueRequest(self, addr, callback_arg):
242      """Pushes an address to addr2line's stdin (and keeps track of it)."""
243      self._symbolizer.requests_counter += 1  # For global "age" of requests.
244      req_idx = self._symbolizer.requests_counter
245      self._request_queue.append((addr, callback_arg, req_idx))
246      self.queue_size += 1
247      self._WriteToA2lStdin(addr)
248
249    def WaitForIdle(self):
250      """Waits until all the pending requests have been symbolized."""
251      while self.queue_size > 0:
252        self.WaitForNextSymbolInQueue()
253
254    def WaitForNextSymbolInQueue(self):
255      """Waits for the next pending request to be symbolized."""
256      if not self.queue_size:
257        return
258
259      # This outer loop guards against a2l hanging (detecting stdout timeout).
260      while True:
261        start_time = datetime.datetime.now()
262        timeout = datetime.timedelta(seconds=self._symbolizer.addr2line_timeout)
263
264        # The inner loop guards against a2l crashing (checking if it exited).
265        while datetime.datetime.now() - start_time < timeout:
266          # poll() returns !None if the process exited. a2l should never exit.
267          if self._proc.poll():
268            logging.warning('addr2line crashed, respawning (lib: %s).',
269                            self._lib_file_name)
270            self._RestartAddr2LineProcess()
271            # TODO(primiano): the best thing to do in this case would be
272            # shrinking the pool size as, very likely, addr2line is crashed
273            # due to low memory (and the respawned one will die again soon).
274
275          try:
276            lines = self._out_queue.get(block=True, timeout=0.25)
277          except Queue.Empty:
278            # On timeout (1/4 s.) repeat the inner loop and check if either the
279            # addr2line process did crash or we waited its output for too long.
280            continue
281
282          # In nominal conditions, we get straight to this point.
283          self._ProcessSymbolOutput(lines)
284          return
285
286        # If this point is reached, we waited more than |addr2line_timeout|.
287        logging.warning('Hung addr2line process, respawning (lib: %s).',
288                        self._lib_file_name)
289        self._RestartAddr2LineProcess()
290
291    def ProcessAllResolvedSymbolsInQueue(self):
292      """Consumes all the addr2line output lines produced (without blocking)."""
293      if not self.queue_size:
294        return
295      while True:
296        try:
297          lines = self._out_queue.get_nowait()
298        except Queue.Empty:
299          break
300        self._ProcessSymbolOutput(lines)
301
302    def RecycleIfNecessary(self):
303      """Restarts the process if it has been used for too long.
304
305      A long running addr2line process will consume excessive amounts
306      of memory without any gain in performance."""
307      if self._processed_symbols_count >= ADDR2LINE_RECYCLE_LIMIT:
308        self._RestartAddr2LineProcess()
309
310
311    def Terminate(self):
312      """Kills the underlying addr2line process.
313
314      The poller |_thread| will terminate as well due to the broken pipe."""
315      try:
316        self._proc.kill()
317        self._proc.communicate()  # Essentially wait() without risking deadlock.
318      except Exception: # pylint: disable=broad-except
319        # An exception while terminating? How interesting.
320        pass
321      self._proc = None
322
323    def _WriteToA2lStdin(self, addr):
324      self._proc.stdin.write('%s\n' % hex(addr))
325      if self._symbolizer.inlines:
326        # In the case of inlines we output an extra blank line, which causes
327        # addr2line to emit a (??,??:0) tuple that we use as a boundary marker.
328        self._proc.stdin.write('\n')
329      self._proc.stdin.flush()
330
331    def _ProcessSymbolOutput(self, lines):
332      """Parses an addr2line symbol output and triggers the client callback."""
333      (_, callback_arg, _) = self._request_queue.popleft()
334      self.queue_size -= 1
335
336      innermost_sym_info = None
337      sym_info = None
338      for (line1, line2) in lines:
339        prev_sym_info = sym_info
340        name = line1 if not line1.startswith('?') else None
341        source_path = None
342        source_line = None
343        m = ELFSymbolizer.Addr2Line.SYM_ADDR_RE.match(line2)
344        if m:
345          if not m.group(1).startswith('?'):
346            source_path = m.group(1)
347            if not m.group(2).startswith('?'):
348              source_line = int(m.group(2))
349        else:
350          logging.warning('Got invalid symbol path from addr2line: %s', line2)
351
352        # In case disambiguation is on, and needed
353        was_ambiguous = False
354        disambiguated = False
355        if self._symbolizer.disambiguate:
356          if source_path and not posixpath.isabs(source_path):
357            path = self._symbolizer.disambiguation_table.get(source_path)
358            was_ambiguous = True
359            disambiguated = path is not None
360            source_path = path if disambiguated else source_path
361
362          # Use absolute paths (so that paths are consistent, as disambiguation
363          # uses absolute paths)
364          if source_path and not was_ambiguous:
365            source_path = os.path.abspath(source_path)
366
367        if source_path and self._symbolizer.strip_base_path:
368          # Strip the base path
369          source_path = re.sub('^' + self._symbolizer.strip_base_path,
370              self._symbolizer.source_root_path or '', source_path)
371
372        sym_info = ELFSymbolInfo(name, source_path, source_line, was_ambiguous,
373                                 disambiguated)
374        if prev_sym_info:
375          prev_sym_info.inlined_by = sym_info
376        if not innermost_sym_info:
377          innermost_sym_info = sym_info
378
379      self._processed_symbols_count += 1
380      self._symbolizer.callback(innermost_sym_info, callback_arg)
381
382    def _RestartAddr2LineProcess(self):
383      if self._proc:
384        self.Terminate()
385
386      # The only reason of existence of this Queue (and the corresponding
387      # Thread below) is the lack of a subprocess.stdout.poll_avail_lines().
388      # Essentially this is a pipe able to extract a couple of lines atomically.
389      self._out_queue = Queue.Queue()
390
391      # Start the underlying addr2line process in line buffered mode.
392
393      cmd = [self._symbolizer.addr2line_path, '--functions', '--demangle',
394          '--exe=' + self._symbolizer.elf_file_path]
395      if self._symbolizer.inlines:
396        cmd += ['--inlines']
397      self._proc = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE,
398          stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True)
399
400      # Start the poller thread, which simply moves atomically the lines read
401      # from the addr2line's stdout to the |_out_queue|.
402      self._thread = threading.Thread(
403          target=ELFSymbolizer.Addr2Line.StdoutReaderThread,
404          args=(self._proc.stdout, self._out_queue, self._symbolizer.inlines))
405      self._thread.daemon = True  # Don't prevent early process exit.
406      self._thread.start()
407
408      self._processed_symbols_count = 0
409
410      # Replay the pending requests on the new process (only for the case
411      # of a hung addr2line timing out during the game).
412      for (addr, _, _) in self._request_queue:
413        self._WriteToA2lStdin(addr)
414
415    @staticmethod
416    def StdoutReaderThread(process_pipe, queue, inlines):
417      """The poller thread fn, which moves the addr2line stdout to the |queue|.
418
419      This is the only piece of code not running on the main thread. It merely
420      writes to a Queue, which is thread-safe. In the case of inlines, it
421      detects the ??,??:0 marker and sends the lines atomically, such that the
422      main thread always receives all the lines corresponding to one symbol in
423      one shot."""
424      try:
425        lines_for_one_symbol = []
426        while True:
427          line1 = process_pipe.readline().rstrip('\r\n')
428          line2 = process_pipe.readline().rstrip('\r\n')
429          if not line1 or not line2:
430            break
431          inline_has_more_lines = inlines and (len(lines_for_one_symbol) == 0 or
432                                  (line1 != '??' and line2 != '??:0'))
433          if not inlines or inline_has_more_lines:
434            lines_for_one_symbol += [(line1, line2)]
435          if inline_has_more_lines:
436            continue
437          queue.put(lines_for_one_symbol)
438          lines_for_one_symbol = []
439        process_pipe.close()
440
441      # Every addr2line processes will die at some point, please die silently.
442      except (IOError, OSError):
443        pass
444
445    @property
446    def first_request_id(self):
447      """Returns the request_id of the oldest pending request in the queue."""
448      return self._request_queue[0][2] if self._request_queue else 0
449
450
451class ELFSymbolInfo(object):
452  """The result of the symbolization passed as first arg. of each callback."""
453
454  def __init__(self, name, source_path, source_line, was_ambiguous=False,
455               disambiguated=False):
456    """All the fields here can be None (if addr2line replies with '??')."""
457    self.name = name
458    self.source_path = source_path
459    self.source_line = source_line
460    # In the case of |inlines|=True, the |inlined_by| points to the outer
461    # function inlining the current one (and so on, to form a chain).
462    self.inlined_by = None
463    self.disambiguated = disambiguated
464    self.was_ambiguous = was_ambiguous
465
466  def __str__(self):
467    return '%s [%s:%d]' % (
468        self.name or '??', self.source_path or '??', self.source_line or 0)
469