1# Copyright 2023 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from abc import ABC, abstractmethod 6import logging 7import subprocess 8import threading 9import time 10import uuid 11 12from devil.utils import reraiser_thread 13 14 15class ExpensiveLineTransformer(ABC): 16 def __init__(self, process_start_timeout, minimum_timeout, per_line_timeout): 17 self._process_start_timeout = process_start_timeout 18 self._minimum_timeout = minimum_timeout 19 self._per_line_timeout = per_line_timeout 20 self._started = False 21 # Allow only one thread to call TransformLines() at a time. 22 self._lock = threading.Lock() 23 # Ensure that only one thread attempts to kill self._proc in Close(). 24 self._close_lock = threading.Lock() 25 self._closed_called = False 26 # Assign to None so that attribute exists if Popen() throws. 27 self._proc = None 28 # Start process eagerly to hide start-up latency. 29 self._proc_start_time = None 30 31 def start(self): 32 # delay the start of the process, to allow the initialization of the 33 # descendant classes first. 34 if self._started: 35 logging.error('%s: Trying to start an already started command', self.name) 36 return 37 38 # Start process eagerly to hide start-up latency. 39 self._proc_start_time = time.time() 40 41 if not self.command: 42 logging.error('%s: No command available', self.name) 43 return 44 45 self._proc = subprocess.Popen(self.command, 46 bufsize=1, 47 stdin=subprocess.PIPE, 48 stdout=subprocess.PIPE, 49 universal_newlines=True, 50 close_fds=True) 51 self._started = True 52 53 def IsClosed(self): 54 return (not self._started or self._closed_called 55 or self._proc.returncode is not None) 56 57 def IsBusy(self): 58 return self._lock.locked() 59 60 def IsReady(self): 61 return self._started and not self.IsClosed() and not self.IsBusy() 62 63 def TransformLines(self, lines): 64 """Symbolizes names found in the given lines. 65 66 If anything goes wrong (process crashes, timeout, etc), returns |lines|. 67 68 Args: 69 lines: A list of strings without trailing newlines. 70 71 Returns: 72 A list of strings without trailing newlines. 73 """ 74 if not lines: 75 return [] 76 77 # symbolized output contain more lines than the input, as the symbolized 78 # stacktraces will be added. To account for the extra output lines, keep 79 # reading until this eof_line token is reached. Using a format that will 80 # be considered a "useful line" without modifying its output by 81 # third_party/android_platform/development/scripts/stack_core.py 82 eof_line = self.getEofLine() 83 out_lines = [] 84 85 def _reader(): 86 while True: 87 line = self._proc.stdout.readline() 88 # Return an empty string at EOF (when stdin is closed). 89 if not line: 90 break 91 line = line[:-1] 92 if line == eof_line: 93 break 94 out_lines.append(line) 95 96 if self.IsBusy(): 97 logging.warning('%s: Having to wait for transformation.', self.name) 98 99 # Allow only one thread to operate at a time. 100 with self._lock: 101 if self.IsClosed(): 102 if self._started and not self._closed_called: 103 logging.warning('%s: Process exited with code=%d.', self.name, 104 self._proc.returncode) 105 self.Close() 106 return lines 107 108 reader_thread = reraiser_thread.ReraiserThread(_reader) 109 reader_thread.start() 110 111 try: 112 self._proc.stdin.write('\n'.join(lines)) 113 self._proc.stdin.write('\n{}\n'.format(eof_line)) 114 self._proc.stdin.flush() 115 time_since_proc_start = time.time() - self._proc_start_time 116 timeout = (max(0, self._process_start_timeout - time_since_proc_start) + 117 max(self._minimum_timeout, 118 len(lines) * self._per_line_timeout)) 119 reader_thread.join(timeout) 120 if self.IsClosed(): 121 logging.warning('%s: Close() called by another thread during join().', 122 self.name) 123 return lines 124 if reader_thread.is_alive(): 125 logging.error('%s: Timed out after %f seconds with input:', self.name, 126 timeout) 127 for l in lines: 128 logging.error(l) 129 logging.error(eof_line) 130 logging.error('%s: End of timed out input.', self.name) 131 logging.error('%s: Timed out output was:', self.name) 132 for l in out_lines: 133 logging.error(l) 134 logging.error('%s: End of timed out output.', self.name) 135 self.Close() 136 return lines 137 return out_lines 138 except IOError: 139 logging.exception('%s: Exception during transformation', self.name) 140 self.Close() 141 return lines 142 143 def Close(self): 144 with self._close_lock: 145 needs_closing = not self.IsClosed() 146 self._closed_called = True 147 148 if needs_closing: 149 self._proc.stdin.close() 150 self._proc.kill() 151 self._proc.wait() 152 153 def __del__(self): 154 # self._proc is None when Popen() fails. 155 if not self._closed_called and self._proc: 156 logging.error('%s: Forgot to Close()', self.name) 157 self.Close() 158 159 @property 160 @abstractmethod 161 def name(self): 162 ... 163 164 @property 165 @abstractmethod 166 def command(self): 167 ... 168 169 @staticmethod 170 def getEofLine(): 171 # Use a format that will be considered a "useful line" without modifying its 172 # output by third_party/android_platform/development/scripts/stack_core.py 173 return "Generic useful log header: \'{}\'".format(uuid.uuid4().hex) 174 175 176class ExpensiveLineTransformerPool(ABC): 177 def __init__(self, max_restarts, pool_size, passthrough_on_failure): 178 self._max_restarts = max_restarts 179 self._pool = [self.CreateTransformer() for _ in range(pool_size)] 180 self._passthrough_on_failure = passthrough_on_failure 181 # Allow only one thread to select from the pool at a time. 182 self._lock = threading.Lock() 183 self._num_restarts = 0 184 185 def __enter__(self): 186 pass 187 188 def __exit__(self, *args): 189 self.Close() 190 191 def TransformLines(self, lines): 192 with self._lock: 193 assert self._pool, 'TransformLines() called on a closed Pool.' 194 195 # transformation is broken. 196 if self._num_restarts == self._max_restarts: 197 if self._passthrough_on_failure: 198 return lines 199 raise Exception('%s is broken.' % self.name) 200 201 # Restart any closed transformer. 202 for i, d in enumerate(self._pool): 203 if d.IsClosed(): 204 logging.warning('%s: Restarting closed instance.', self.name) 205 self._pool[i] = self.CreateTransformer() 206 self._num_restarts += 1 207 if self._num_restarts == self._max_restarts: 208 logging.warning('%s: MAX_RESTARTS reached.', self.name) 209 if self._passthrough_on_failure: 210 return lines 211 raise Exception('%s is broken.' % self.name) 212 213 selected = next((x for x in self._pool if x.IsReady()), self._pool[0]) 214 # Rotate the order so that next caller will not choose the same one. 215 self._pool.remove(selected) 216 self._pool.append(selected) 217 218 return selected.TransformLines(lines) 219 220 def Close(self): 221 with self._lock: 222 for d in self._pool: 223 d.Close() 224 self._pool = None 225 226 @abstractmethod 227 def CreateTransformer(self): 228 ... 229 230 @property 231 @abstractmethod 232 def name(self): 233 ... 234