1# Copyright 2014 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import collections 6import datetime 7import logging 8import multiprocessing 9import os 10import posixpath 11import Queue 12import re 13import subprocess 14import sys 15import threading 16import time 17 18 19# addr2line builds a possibly infinite memory cache that can exhaust 20# the computer's memory if allowed to grow for too long. This constant 21# controls how many lookups we do before restarting the process. 4000 22# gives near peak performance without extreme memory usage. 23ADDR2LINE_RECYCLE_LIMIT = 4000 24 25 26class ELFSymbolizer(object): 27 """An uber-fast (multiprocessing, pipelined and asynchronous) ELF symbolizer. 28 29 This class is a frontend for addr2line (part of GNU binutils), designed to 30 symbolize batches of large numbers of symbols for a given ELF file. It 31 supports sharding symbolization against many addr2line instances and 32 pipelining of multiple requests per each instance (in order to hide addr2line 33 internals and OS pipe latencies). 34 35 The interface exhibited by this class is a very simple asynchronous interface, 36 which is based on the following three methods: 37 - SymbolizeAsync(): used to request (enqueue) resolution of a given address. 38 - The |callback| method: used to communicated back the symbol information. 39 - Join(): called to conclude the batch to gather the last outstanding results. 40 In essence, before the Join method returns, this class will have issued as 41 many callbacks as the number of SymbolizeAsync() calls. In this regard, note 42 that due to multiprocess sharding, callbacks can be delivered out of order. 43 44 Some background about addr2line: 45 - it is invoked passing the elf path in the cmdline, piping the addresses in 46 its stdin and getting results on its stdout. 47 - it has pretty large response times for the first requests, but it 48 works very well in streaming mode once it has been warmed up. 49 - it doesn't scale by itself (on more cores). However, spawning multiple 50 instances at the same time on the same file is pretty efficient as they 51 keep hitting the pagecache and become mostly CPU bound. 52 - it might hang or crash, mostly for OOM. This class deals with both of these 53 problems. 54 55 Despite the "scary" imports and the multi* words above, (almost) no multi- 56 threading/processing is involved from the python viewpoint. Concurrency 57 here is achieved by spawning several addr2line subprocesses and handling their 58 output pipes asynchronously. Therefore, all the code here (with the exception 59 of the Queue instance in Addr2Line) should be free from mind-blowing 60 thread-safety concerns. 61 62 The multiprocess sharding works as follows: 63 The symbolizer tries to use the lowest number of addr2line instances as 64 possible (with respect of |max_concurrent_jobs|) and enqueue all the requests 65 in a single addr2line instance. For few symbols (i.e. dozens) sharding isn't 66 worth the startup cost. 67 The multiprocess logic kicks in as soon as the queues for the existing 68 instances grow. Specifically, once all the existing instances reach the 69 |max_queue_size| bound, a new addr2line instance is kicked in. 70 In the case of a very eager producer (i.e. all |max_concurrent_jobs| instances 71 have a backlog of |max_queue_size|), back-pressure is applied on the caller by 72 blocking the SymbolizeAsync method. 73 74 This module has been deliberately designed to be dependency free (w.r.t. of 75 other modules in this project), to allow easy reuse in external projects. 76 """ 77 78 def __init__(self, elf_file_path, addr2line_path, callback, inlines=False, 79 max_concurrent_jobs=None, addr2line_timeout=30, max_queue_size=50, 80 source_root_path=None, strip_base_path=None): 81 """Args: 82 elf_file_path: path of the elf file to be symbolized. 83 addr2line_path: path of the toolchain's addr2line binary. 84 callback: a callback which will be invoked for each resolved symbol with 85 the two args (sym_info, callback_arg). The former is an instance of 86 |ELFSymbolInfo| and contains the symbol information. The latter is an 87 embedder-provided argument which is passed to SymbolizeAsync(). 88 inlines: when True, the ELFSymbolInfo will contain also the details about 89 the outer inlining functions. When False, only the innermost function 90 will be provided. 91 max_concurrent_jobs: Max number of addr2line instances spawned. 92 Parallelize responsibly, addr2line is a memory and I/O monster. 93 max_queue_size: Max number of outstanding requests per addr2line instance. 94 addr2line_timeout: Max time (in seconds) to wait for a addr2line response. 95 After the timeout, the instance will be considered hung and respawned. 96 source_root_path: In some toolchains only the name of the source file is 97 is output, without any path information; disambiguation searches 98 through the source directory specified by |source_root_path| argument 99 for files whose name matches, adding the full path information to the 100 output. For example, if the toolchain outputs "unicode.cc" and there 101 is a file called "unicode.cc" located under |source_root_path|/foo, 102 the tool will replace "unicode.cc" with 103 "|source_root_path|/foo/unicode.cc". If there are multiple files with 104 the same name, disambiguation will fail because the tool cannot 105 determine which of the files was the source of the symbol. 106 strip_base_path: Rebases the symbols source paths onto |source_root_path| 107 (i.e replace |strip_base_path| with |source_root_path). 108 """ 109 assert(os.path.isfile(addr2line_path)), 'Cannot find ' + addr2line_path 110 self.elf_file_path = elf_file_path 111 self.addr2line_path = addr2line_path 112 self.callback = callback 113 self.inlines = inlines 114 self.max_concurrent_jobs = (max_concurrent_jobs or 115 min(multiprocessing.cpu_count(), 4)) 116 self.max_queue_size = max_queue_size 117 self.addr2line_timeout = addr2line_timeout 118 self.requests_counter = 0 # For generating monotonic request IDs. 119 self._a2l_instances = [] # Up to |max_concurrent_jobs| _Addr2Line inst. 120 121 # If necessary, create disambiguation lookup table 122 self.disambiguate = source_root_path is not None 123 self.disambiguation_table = {} 124 self.strip_base_path = strip_base_path 125 if self.disambiguate: 126 self.source_root_path = os.path.abspath(source_root_path) 127 self._CreateDisambiguationTable() 128 129 # Create one addr2line instance. More instances will be created on demand 130 # (up to |max_concurrent_jobs|) depending on the rate of the requests. 131 self._CreateNewA2LInstance() 132 133 def SymbolizeAsync(self, addr, callback_arg=None): 134 """Requests symbolization of a given address. 135 136 This method is not guaranteed to return immediately. It generally does, but 137 in some scenarios (e.g. all addr2line instances have full queues) it can 138 block to create back-pressure. 139 140 Args: 141 addr: address to symbolize. 142 callback_arg: optional argument which will be passed to the |callback|.""" 143 assert isinstance(addr, int) 144 145 # Process all the symbols that have been resolved in the meanwhile. 146 # Essentially, this drains all the addr2line(s) out queues. 147 for a2l_to_purge in self._a2l_instances: 148 a2l_to_purge.ProcessAllResolvedSymbolsInQueue() 149 a2l_to_purge.RecycleIfNecessary() 150 151 # Find the best instance according to this logic: 152 # 1. Find an existing instance with the shortest queue. 153 # 2. If all of instances' queues are full, but there is room in the pool, 154 # (i.e. < |max_concurrent_jobs|) create a new instance. 155 # 3. If there were already |max_concurrent_jobs| instances and all of them 156 # had full queues, make back-pressure. 157 158 # 1. 159 def _SortByQueueSizeAndReqID(a2l): 160 return (a2l.queue_size, a2l.first_request_id) 161 a2l = min(self._a2l_instances, key=_SortByQueueSizeAndReqID) 162 163 # 2. 164 if (a2l.queue_size >= self.max_queue_size and 165 len(self._a2l_instances) < self.max_concurrent_jobs): 166 a2l = self._CreateNewA2LInstance() 167 168 # 3. 169 if a2l.queue_size >= self.max_queue_size: 170 a2l.WaitForNextSymbolInQueue() 171 172 a2l.EnqueueRequest(addr, callback_arg) 173 174 def Join(self): 175 """Waits for all the outstanding requests to complete and terminates.""" 176 for a2l in self._a2l_instances: 177 a2l.WaitForIdle() 178 a2l.Terminate() 179 180 def _CreateNewA2LInstance(self): 181 assert len(self._a2l_instances) < self.max_concurrent_jobs 182 a2l = ELFSymbolizer.Addr2Line(self) 183 self._a2l_instances.append(a2l) 184 return a2l 185 186 def _CreateDisambiguationTable(self): 187 """ Non-unique file names will result in None entries""" 188 start_time = time.time() 189 logging.info('Collecting information about available source files...') 190 self.disambiguation_table = {} 191 192 for root, _, filenames in os.walk(self.source_root_path): 193 for f in filenames: 194 self.disambiguation_table[f] = os.path.join(root, f) if (f not in 195 self.disambiguation_table) else None 196 logging.info('Finished collecting information about ' 197 'possible files (took %.1f s).', 198 (time.time() - start_time)) 199 200 201 class Addr2Line(object): 202 """A python wrapper around an addr2line instance. 203 204 The communication with the addr2line process looks as follows: 205 [STDIN] [STDOUT] (from addr2line's viewpoint) 206 > f001111 207 > f002222 208 < Symbol::Name(foo, bar) for f001111 209 < /path/to/source/file.c:line_number 210 > f003333 211 < Symbol::Name2() for f002222 212 < /path/to/source/file.c:line_number 213 < Symbol::Name3() for f003333 214 < /path/to/source/file.c:line_number 215 """ 216 217 SYM_ADDR_RE = re.compile(r'([^:]+):(\?|\d+).*') 218 219 def __init__(self, symbolizer): 220 self._symbolizer = symbolizer 221 self._lib_file_name = posixpath.basename(symbolizer.elf_file_path) 222 223 # The request queue (i.e. addresses pushed to addr2line's stdin and not 224 # yet retrieved on stdout) 225 self._request_queue = collections.deque() 226 227 # This is essentially len(self._request_queue). It has been optimized to a 228 # separate field because turned out to be a perf hot-spot. 229 self.queue_size = 0 230 231 # Keep track of the number of symbols a process has processed to 232 # avoid a single process growing too big and using all the memory. 233 self._processed_symbols_count = 0 234 235 # Objects required to handle the addr2line subprocess. 236 self._proc = None # Subprocess.Popen(...) instance. 237 self._thread = None # Threading.thread instance. 238 self._out_queue = None # Queue.Queue instance (for buffering a2l stdout). 239 self._RestartAddr2LineProcess() 240 241 def EnqueueRequest(self, addr, callback_arg): 242 """Pushes an address to addr2line's stdin (and keeps track of it).""" 243 self._symbolizer.requests_counter += 1 # For global "age" of requests. 244 req_idx = self._symbolizer.requests_counter 245 self._request_queue.append((addr, callback_arg, req_idx)) 246 self.queue_size += 1 247 self._WriteToA2lStdin(addr) 248 249 def WaitForIdle(self): 250 """Waits until all the pending requests have been symbolized.""" 251 while self.queue_size > 0: 252 self.WaitForNextSymbolInQueue() 253 254 def WaitForNextSymbolInQueue(self): 255 """Waits for the next pending request to be symbolized.""" 256 if not self.queue_size: 257 return 258 259 # This outer loop guards against a2l hanging (detecting stdout timeout). 260 while True: 261 start_time = datetime.datetime.now() 262 timeout = datetime.timedelta(seconds=self._symbolizer.addr2line_timeout) 263 264 # The inner loop guards against a2l crashing (checking if it exited). 265 while datetime.datetime.now() - start_time < timeout: 266 # poll() returns !None if the process exited. a2l should never exit. 267 if self._proc.poll(): 268 logging.warning('addr2line crashed, respawning (lib: %s).', 269 self._lib_file_name) 270 self._RestartAddr2LineProcess() 271 # TODO(primiano): the best thing to do in this case would be 272 # shrinking the pool size as, very likely, addr2line is crashed 273 # due to low memory (and the respawned one will die again soon). 274 275 try: 276 lines = self._out_queue.get(block=True, timeout=0.25) 277 except Queue.Empty: 278 # On timeout (1/4 s.) repeat the inner loop and check if either the 279 # addr2line process did crash or we waited its output for too long. 280 continue 281 282 # In nominal conditions, we get straight to this point. 283 self._ProcessSymbolOutput(lines) 284 return 285 286 # If this point is reached, we waited more than |addr2line_timeout|. 287 logging.warning('Hung addr2line process, respawning (lib: %s).', 288 self._lib_file_name) 289 self._RestartAddr2LineProcess() 290 291 def ProcessAllResolvedSymbolsInQueue(self): 292 """Consumes all the addr2line output lines produced (without blocking).""" 293 if not self.queue_size: 294 return 295 while True: 296 try: 297 lines = self._out_queue.get_nowait() 298 except Queue.Empty: 299 break 300 self._ProcessSymbolOutput(lines) 301 302 def RecycleIfNecessary(self): 303 """Restarts the process if it has been used for too long. 304 305 A long running addr2line process will consume excessive amounts 306 of memory without any gain in performance.""" 307 if self._processed_symbols_count >= ADDR2LINE_RECYCLE_LIMIT: 308 self._RestartAddr2LineProcess() 309 310 311 def Terminate(self): 312 """Kills the underlying addr2line process. 313 314 The poller |_thread| will terminate as well due to the broken pipe.""" 315 try: 316 self._proc.kill() 317 self._proc.communicate() # Essentially wait() without risking deadlock. 318 except Exception: # pylint: disable=broad-except 319 # An exception while terminating? How interesting. 320 pass 321 self._proc = None 322 323 def _WriteToA2lStdin(self, addr): 324 self._proc.stdin.write('%s\n' % hex(addr)) 325 if self._symbolizer.inlines: 326 # In the case of inlines we output an extra blank line, which causes 327 # addr2line to emit a (??,??:0) tuple that we use as a boundary marker. 328 self._proc.stdin.write('\n') 329 self._proc.stdin.flush() 330 331 def _ProcessSymbolOutput(self, lines): 332 """Parses an addr2line symbol output and triggers the client callback.""" 333 (_, callback_arg, _) = self._request_queue.popleft() 334 self.queue_size -= 1 335 336 innermost_sym_info = None 337 sym_info = None 338 for (line1, line2) in lines: 339 prev_sym_info = sym_info 340 name = line1 if not line1.startswith('?') else None 341 source_path = None 342 source_line = None 343 m = ELFSymbolizer.Addr2Line.SYM_ADDR_RE.match(line2) 344 if m: 345 if not m.group(1).startswith('?'): 346 source_path = m.group(1) 347 if not m.group(2).startswith('?'): 348 source_line = int(m.group(2)) 349 else: 350 logging.warning('Got invalid symbol path from addr2line: %s', line2) 351 352 # In case disambiguation is on, and needed 353 was_ambiguous = False 354 disambiguated = False 355 if self._symbolizer.disambiguate: 356 if source_path and not posixpath.isabs(source_path): 357 path = self._symbolizer.disambiguation_table.get(source_path) 358 was_ambiguous = True 359 disambiguated = path is not None 360 source_path = path if disambiguated else source_path 361 362 # Use absolute paths (so that paths are consistent, as disambiguation 363 # uses absolute paths) 364 if source_path and not was_ambiguous: 365 source_path = os.path.abspath(source_path) 366 367 if source_path and self._symbolizer.strip_base_path: 368 # Strip the base path 369 source_path = re.sub('^' + self._symbolizer.strip_base_path, 370 self._symbolizer.source_root_path or '', source_path) 371 372 sym_info = ELFSymbolInfo(name, source_path, source_line, was_ambiguous, 373 disambiguated) 374 if prev_sym_info: 375 prev_sym_info.inlined_by = sym_info 376 if not innermost_sym_info: 377 innermost_sym_info = sym_info 378 379 self._processed_symbols_count += 1 380 self._symbolizer.callback(innermost_sym_info, callback_arg) 381 382 def _RestartAddr2LineProcess(self): 383 if self._proc: 384 self.Terminate() 385 386 # The only reason of existence of this Queue (and the corresponding 387 # Thread below) is the lack of a subprocess.stdout.poll_avail_lines(). 388 # Essentially this is a pipe able to extract a couple of lines atomically. 389 self._out_queue = Queue.Queue() 390 391 # Start the underlying addr2line process in line buffered mode. 392 393 cmd = [self._symbolizer.addr2line_path, '--functions', '--demangle', 394 '--exe=' + self._symbolizer.elf_file_path] 395 if self._symbolizer.inlines: 396 cmd += ['--inlines'] 397 self._proc = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE, 398 stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True) 399 400 # Start the poller thread, which simply moves atomically the lines read 401 # from the addr2line's stdout to the |_out_queue|. 402 self._thread = threading.Thread( 403 target=ELFSymbolizer.Addr2Line.StdoutReaderThread, 404 args=(self._proc.stdout, self._out_queue, self._symbolizer.inlines)) 405 self._thread.daemon = True # Don't prevent early process exit. 406 self._thread.start() 407 408 self._processed_symbols_count = 0 409 410 # Replay the pending requests on the new process (only for the case 411 # of a hung addr2line timing out during the game). 412 for (addr, _, _) in self._request_queue: 413 self._WriteToA2lStdin(addr) 414 415 @staticmethod 416 def StdoutReaderThread(process_pipe, queue, inlines): 417 """The poller thread fn, which moves the addr2line stdout to the |queue|. 418 419 This is the only piece of code not running on the main thread. It merely 420 writes to a Queue, which is thread-safe. In the case of inlines, it 421 detects the ??,??:0 marker and sends the lines atomically, such that the 422 main thread always receives all the lines corresponding to one symbol in 423 one shot.""" 424 try: 425 lines_for_one_symbol = [] 426 while True: 427 line1 = process_pipe.readline().rstrip('\r\n') 428 line2 = process_pipe.readline().rstrip('\r\n') 429 if not line1 or not line2: 430 break 431 inline_has_more_lines = inlines and (len(lines_for_one_symbol) == 0 or 432 (line1 != '??' and line2 != '??:0')) 433 if not inlines or inline_has_more_lines: 434 lines_for_one_symbol += [(line1, line2)] 435 if inline_has_more_lines: 436 continue 437 queue.put(lines_for_one_symbol) 438 lines_for_one_symbol = [] 439 process_pipe.close() 440 441 # Every addr2line processes will die at some point, please die silently. 442 except (IOError, OSError): 443 pass 444 445 @property 446 def first_request_id(self): 447 """Returns the request_id of the oldest pending request in the queue.""" 448 return self._request_queue[0][2] if self._request_queue else 0 449 450 451class ELFSymbolInfo(object): 452 """The result of the symbolization passed as first arg. of each callback.""" 453 454 def __init__(self, name, source_path, source_line, was_ambiguous=False, 455 disambiguated=False): 456 """All the fields here can be None (if addr2line replies with '??').""" 457 self.name = name 458 self.source_path = source_path 459 self.source_line = source_line 460 # In the case of |inlines|=True, the |inlined_by| points to the outer 461 # function inlining the current one (and so on, to form a chain). 462 self.inlined_by = None 463 self.disambiguated = disambiguated 464 self.was_ambiguous = was_ambiguous 465 466 def __str__(self): 467 return '%s [%s:%d]' % ( 468 self.name or '??', self.source_path or '??', self.source_line or 0) 469