• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from abc import ABC, abstractmethod
6import logging
7import subprocess
8import threading
9import time
10import uuid
11
12from devil.utils import reraiser_thread
13
14
15class ExpensiveLineTransformer(ABC):
16  def __init__(self, process_start_timeout, minimum_timeout, per_line_timeout):
17    self._process_start_timeout = process_start_timeout
18    self._minimum_timeout = minimum_timeout
19    self._per_line_timeout = per_line_timeout
20    self._started = False
21    # Allow only one thread to call TransformLines() at a time.
22    self._lock = threading.Lock()
23    # Ensure that only one thread attempts to kill self._proc in Close().
24    self._close_lock = threading.Lock()
25    self._closed_called = False
26    # Assign to None so that attribute exists if Popen() throws.
27    self._proc = None
28    # Start process eagerly to hide start-up latency.
29    self._proc_start_time = None
30
31  def start(self):
32    # delay the start of the process, to allow the initialization of the
33    # descendant classes first.
34    if self._started:
35      logging.error('%s: Trying to start an already started command', self.name)
36      return
37
38    # Start process eagerly to hide start-up latency.
39    self._proc_start_time = time.time()
40
41    if not self.command:
42      logging.error('%s: No command available', self.name)
43      return
44
45    self._proc = subprocess.Popen(self.command,
46                                  bufsize=1,
47                                  stdin=subprocess.PIPE,
48                                  stdout=subprocess.PIPE,
49                                  universal_newlines=True,
50                                  close_fds=True)
51    self._started = True
52
53  def IsClosed(self):
54    return (not self._started or self._closed_called
55            or self._proc.returncode is not None)
56
57  def IsBusy(self):
58    return self._lock.locked()
59
60  def IsReady(self):
61    return self._started and not self.IsClosed() and not self.IsBusy()
62
63  def TransformLines(self, lines):
64    """Symbolizes names found in the given lines.
65
66    If anything goes wrong (process crashes, timeout, etc), returns |lines|.
67
68    Args:
69      lines: A list of strings without trailing newlines.
70
71    Returns:
72      A list of strings without trailing newlines.
73    """
74    if not lines:
75      return []
76
77    # symbolized output contain more lines than the input, as the symbolized
78    # stacktraces will be added. To account for the extra output lines, keep
79    # reading until this eof_line token is reached. Using a format that will
80    # be considered a "useful line" without modifying its output by
81    # third_party/android_platform/development/scripts/stack_core.py
82    eof_line = self.getEofLine()
83    out_lines = []
84
85    def _reader():
86      while True:
87        line = self._proc.stdout.readline()
88        # Return an empty string at EOF (when stdin is closed).
89        if not line:
90          break
91        line = line[:-1]
92        if line == eof_line:
93          break
94        out_lines.append(line)
95
96    if self.IsBusy():
97      logging.warning('%s: Having to wait for transformation.', self.name)
98
99    # Allow only one thread to operate at a time.
100    with self._lock:
101      if self.IsClosed():
102        if self._started and not self._closed_called:
103          logging.warning('%s: Process exited with code=%d.', self.name,
104                          self._proc.returncode)
105          self.Close()
106        return lines
107
108      reader_thread = reraiser_thread.ReraiserThread(_reader)
109      reader_thread.start()
110
111      try:
112        self._proc.stdin.write('\n'.join(lines))
113        self._proc.stdin.write('\n{}\n'.format(eof_line))
114        self._proc.stdin.flush()
115        time_since_proc_start = time.time() - self._proc_start_time
116        timeout = (max(0, self._process_start_timeout - time_since_proc_start) +
117                   max(self._minimum_timeout,
118                       len(lines) * self._per_line_timeout))
119        reader_thread.join(timeout)
120        if self.IsClosed():
121          logging.warning('%s: Close() called by another thread during join().',
122                          self.name)
123          return lines
124        if reader_thread.is_alive():
125          logging.error('%s: Timed out after %f seconds with input:', self.name,
126                        timeout)
127          for l in lines:
128            logging.error(l)
129          logging.error(eof_line)
130          logging.error('%s: End of timed out input.', self.name)
131          logging.error('%s: Timed out output was:', self.name)
132          for l in out_lines:
133            logging.error(l)
134          logging.error('%s: End of timed out output.', self.name)
135          self.Close()
136          return lines
137        return out_lines
138      except IOError:
139        logging.exception('%s: Exception during transformation', self.name)
140        self.Close()
141        return lines
142
143  def Close(self):
144    with self._close_lock:
145      needs_closing = not self.IsClosed()
146      self._closed_called = True
147
148    if needs_closing:
149      self._proc.stdin.close()
150      self._proc.kill()
151      self._proc.wait()
152
153  def __del__(self):
154    # self._proc is None when Popen() fails.
155    if not self._closed_called and self._proc:
156      logging.error('%s: Forgot to Close()', self.name)
157      self.Close()
158
159  @property
160  @abstractmethod
161  def name(self):
162    ...
163
164  @property
165  @abstractmethod
166  def command(self):
167    ...
168
169  @staticmethod
170  def getEofLine():
171    # Use a format that will be considered a "useful line" without modifying its
172    # output by third_party/android_platform/development/scripts/stack_core.py
173    return "Generic useful log header: \'{}\'".format(uuid.uuid4().hex)
174
175
176class ExpensiveLineTransformerPool(ABC):
177  def __init__(self, max_restarts, pool_size, passthrough_on_failure):
178    self._max_restarts = max_restarts
179    self._pool = [self.CreateTransformer() for _ in range(pool_size)]
180    self._passthrough_on_failure = passthrough_on_failure
181    # Allow only one thread to select from the pool at a time.
182    self._lock = threading.Lock()
183    self._num_restarts = 0
184
185  def __enter__(self):
186    pass
187
188  def __exit__(self, *args):
189    self.Close()
190
191  def TransformLines(self, lines):
192    with self._lock:
193      assert self._pool, 'TransformLines() called on a closed Pool.'
194
195      # transformation is broken.
196      if self._num_restarts == self._max_restarts:
197        if self._passthrough_on_failure:
198          return lines
199        raise Exception('%s is broken.' % self.name)
200
201      # Restart any closed transformer.
202      for i, d in enumerate(self._pool):
203        if d.IsClosed():
204          logging.warning('%s: Restarting closed instance.', self.name)
205          self._pool[i] = self.CreateTransformer()
206          self._num_restarts += 1
207          if self._num_restarts == self._max_restarts:
208            logging.warning('%s: MAX_RESTARTS reached.', self.name)
209            if self._passthrough_on_failure:
210              return lines
211            raise Exception('%s is broken.' % self.name)
212
213      selected = next((x for x in self._pool if x.IsReady()), self._pool[0])
214      # Rotate the order so that next caller will not choose the same one.
215      self._pool.remove(selected)
216      self._pool.append(selected)
217
218    return selected.TransformLines(lines)
219
220  def Close(self):
221    with self._lock:
222      for d in self._pool:
223        d.Close()
224      self._pool = None
225
226  @abstractmethod
227  def CreateTransformer(self):
228    ...
229
230  @property
231  @abstractmethod
232  def name(self):
233    ...
234