1# Copyright 2014 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Background tasks for the www_server module. 6 7This module has the logic for handling background tasks for the www frontend. 8Long term actions (like periodic tracing), cannot be served synchronously in the 9context of a /ajax/endpoint request (would timeout HTTP). Instead, for such long 10operations, an instance of |BackgroundTask| is created here and the server 11returns just its id. The client can later poll the status of the asynchronous 12task to check for its progress. 13 14From a technical viewpoint, each background task is just a python subprocess 15which communicates its progress updates through a Queue. The messages enqueued 16are tuples with the following format: (completion_ratio%, 'message string'). 17""" 18 19import datetime 20import multiprocessing 21import Queue 22import time 23 24from memory_inspector.core import backends 25from memory_inspector.data import file_storage 26 27 28_tasks = {} #id (int) -> |BackgroundTask| instance. 29 30 31def StartTracer(process, storage_path, interval, count, trace_native_heap): 32 assert(isinstance(process, backends.Process)) 33 task = BackgroundTask( 34 TracerMain_, 35 storage_path=storage_path, 36 backend_name=process.device.backend.name, 37 device_id=process.device.id, 38 pid=process.pid, 39 interval=interval, 40 count=count, 41 trace_native_heap=trace_native_heap) 42 task.start() 43 _tasks[task.pid] = task 44 return task.pid 45 46 47def Get(task_id): 48 return _tasks.get(task_id) 49 50 51def TerminateAll(): 52 for proc in _tasks.itervalues(): 53 if proc.is_alive(): 54 proc.terminate() 55 _tasks.clear() 56 57 58def TracerMain_(log, storage_path, backend_name, device_id, pid, interval, 59 count, trace_native_heap): 60 """Entry point for the background periodic tracer task.""" 61 # Initialize storage. 62 storage = file_storage.Storage(storage_path) 63 64 # Initialize the backend. 65 backend = backends.GetBackend(backend_name) 66 for k, v in storage.LoadSettings(backend_name).iteritems(): 67 backend.settings[k] = v 68 69 # Initialize the device. 70 device = backends.GetDevice(backend_name, device_id) 71 for k, v in storage.LoadSettings(device_id).iteritems(): 72 device.settings[k] = v 73 74 # Start periodic tracing. 75 process = device.GetProcess(pid) 76 log.put((1, 'Starting trace (%d dumps x %s s.). Device: %s, process: %s' % ( 77 count, interval, device.name, process.name))) 78 datetime_str = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M') 79 archive_name = '%s - %s - %s' % (datetime_str, device.name, process.name) 80 archive = storage.OpenArchive(archive_name, create=True) 81 heaps_to_symbolize = [] 82 83 for i in xrange(1, count + 1): # [1, count] range is easier to handle. 84 process = device.GetProcess(pid) 85 if not process: 86 log.put((100, 'Process %d died.' % pid)) 87 return 1 88 # Calculate the completion rate proportionally to 80%. We keep the remaining 89 # 20% for the final symbolization step (just an approximate estimation). 90 completion = 80 * i / count 91 log.put((completion, 'Dumping trace %d of %d' % (i, count))) 92 archive.StartNewSnapshot() 93 mmaps = process.DumpMemoryMaps() 94 log.put((completion, 'Dumped %d memory maps' % len(mmaps))) 95 archive.StoreMemMaps(mmaps) 96 if trace_native_heap: 97 nheap = process.DumpNativeHeap() 98 log.put((completion, 'Dumped %d native allocs' % len(nheap.allocations))) 99 archive.StoreNativeHeap(nheap) 100 heaps_to_symbolize += [nheap] 101 102 if i < count: 103 time.sleep(interval) 104 105 log.put((90, 'Symbolizing')) 106 symbols = backend.ExtractSymbols(heaps_to_symbolize, 107 device.settings['native_symbol_paths'] or '') 108 109 expected_symbols_count = len(set.union( 110 *[set(x.stack_frames.iterkeys()) for x in heaps_to_symbolize])) 111 log.put((99, 'Symbolization complete. Got %d symbols (%.1f%%).' % ( 112 len(symbols), 100.0 * len(symbols) / expected_symbols_count))) 113 114 archive.StoreSymbols(symbols) 115 116 log.put((100, 'Trace complete.')) 117 return 0 118 119 120class BackgroundTask(multiprocessing.Process): 121 def __init__(self, entry_point, *args, **kwargs): 122 self._log_queue = multiprocessing.Queue() 123 self._progress_log = [] # A list of tuples [(50%, 'msg1'), (100%, 'msg2')]. 124 super(BackgroundTask, self).__init__( 125 target=entry_point, 126 args=((self._log_queue,) + args), # Just propagate all args. 127 kwargs=kwargs) 128 129 def GetProgress(self): 130 """ Returns a tuple (completion_rate, message). """ 131 while True: 132 try: 133 self._progress_log += [self._log_queue.get(block=False)] 134 except Queue.Empty: 135 break 136 if not self.is_alive() and self.exitcode != 0: 137 return self._progress_log + [(100, 'Failed with code %d' % self.exitcode)] 138 return self._progress_log