• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2021 The Chromium Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""Creates an server to offload non-critical-path GN targets."""
6
7from __future__ import annotations
8
9import argparse
10import json
11import os
12import queue
13import shutil
14import socket
15import subprocess
16import sys
17import threading
18from typing import Callable, Dict, List, Optional, Tuple
19
20sys.path.append(os.path.join(os.path.dirname(__file__), 'gyp'))
21from util import server_utils
22
23
24def log(msg: str, *, end: str = ''):
25  # Shrink the message (leaving a 2-char prefix and use the rest of the room
26  # for the suffix) according to terminal size so it is always one line.
27  width = shutil.get_terminal_size().columns
28  prefix = f'[{TaskStats.prefix()}] '
29  max_msg_width = width - len(prefix)
30  if len(msg) > max_msg_width:
31    length_to_show = max_msg_width - 5  # Account for ellipsis and header.
32    msg = f'{msg[:2]}...{msg[-length_to_show:]}'
33  # \r to return the carriage to the beginning of line.
34  # \033[K to replace the normal \n to erase until the end of the line.
35  # Avoid the default line ending so the next \r overwrites the same line just
36  #     like ninja's output.
37  print(f'\r{prefix}{msg}\033[K', end=end, flush=True)
38
39
40class TaskStats:
41  """Class to keep track of aggregate stats for all tasks across threads."""
42  _num_processes = 0
43  _completed_tasks = 0
44  _total_tasks = 0
45  _lock = threading.Lock()
46
47  @classmethod
48  def no_running_processes(cls):
49    return cls._num_processes == 0
50
51  @classmethod
52  def add_task(cls):
53    # Only the main thread calls this, so there is no need for locking.
54    cls._total_tasks += 1
55
56  @classmethod
57  def add_process(cls):
58    with cls._lock:
59      cls._num_processes += 1
60
61  @classmethod
62  def remove_process(cls):
63    with cls._lock:
64      cls._num_processes -= 1
65
66  @classmethod
67  def complete_task(cls):
68    with cls._lock:
69      cls._completed_tasks += 1
70
71  @classmethod
72  def prefix(cls):
73    # Ninja's prefix is: [205 processes, 6/734 @ 6.5/s : 0.922s ]
74    # Time taken and task completion rate are not important for the build server
75    # since it is always running in the background and uses idle priority for
76    # its tasks.
77    with cls._lock:
78      word = 'process' if cls._num_processes == 1 else 'processes'
79      return (f'{cls._num_processes} {word}, '
80              f'{cls._completed_tasks}/{cls._total_tasks}')
81
82
83class TaskManager:
84  """Class to encapsulate a threadsafe queue and handle deactivating it."""
85
86  def __init__(self):
87    self._queue: queue.SimpleQueue[Task] = queue.SimpleQueue()
88    self._deactivated = False
89
90  def add_task(self, task: Task):
91    assert not self._deactivated
92    TaskStats.add_task()
93    self._queue.put(task)
94    log(f'QUEUED {task.name}')
95    self._maybe_start_tasks()
96
97  def deactivate(self):
98    self._deactivated = True
99    while not self._queue.empty():
100      try:
101        task = self._queue.get_nowait()
102      except queue.Empty:
103        return
104      task.terminate()
105
106  @staticmethod
107  def _num_running_processes():
108    with open('/proc/stat') as f:
109      for line in f:
110        if line.startswith('procs_running'):
111          return int(line.rstrip().split()[1])
112    assert False, 'Could not read /proc/stat'
113    return 0
114
115  def _maybe_start_tasks(self):
116    if self._deactivated:
117      return
118    # Include load avg so that a small dip in the number of currently running
119    # processes will not cause new tasks to be started while the overall load is
120    # heavy.
121    cur_load = max(self._num_running_processes(), os.getloadavg()[0])
122    num_started = 0
123    # Always start a task if we don't have any running, so that all tasks are
124    # eventually finished. Try starting up tasks when the overall load is light.
125    # Limit to at most 2 new tasks to prevent ramping up too fast. There is a
126    # chance where multiple threads call _maybe_start_tasks and each gets to
127    # spawn up to 2 new tasks, but since the only downside is some build tasks
128    # get worked on earlier rather than later, it is not worth mitigating.
129    while num_started < 2 and (TaskStats.no_running_processes()
130                               or num_started + cur_load < os.cpu_count()):
131      try:
132        next_task = self._queue.get_nowait()
133      except queue.Empty:
134        return
135      num_started += next_task.start(self._maybe_start_tasks)
136
137
138# TODO(wnwen): Break this into Request (encapsulating what ninja sends) and Task
139#              when a Request starts to be run. This would eliminate ambiguity
140#              about when and whether _proc/_thread are initialized.
141class Task:
142  """Class to represent one task and operations on it."""
143
144  def __init__(self, name: str, cwd: str, cmd: List[str], stamp_file: str):
145    self.name = name
146    self.cwd = cwd
147    self.cmd = cmd
148    self.stamp_file = stamp_file
149    self._terminated = False
150    self._replaced = False
151    self._lock = threading.Lock()
152    self._proc: Optional[subprocess.Popen] = None
153    self._thread: Optional[threading.Thread] = None
154    self._return_code: Optional[int] = None
155
156  @property
157  def key(self):
158    return (self.cwd, self.name)
159
160  def start(self, on_complete_callback: Callable[[], None]) -> int:
161    """Starts the task if it has not already been terminated.
162
163    Returns the number of processes that have been started. This is called at
164    most once when the task is popped off the task queue."""
165
166    # The environment variable forces the script to actually run in order to
167    # avoid infinite recursion.
168    env = os.environ.copy()
169    env[server_utils.BUILD_SERVER_ENV_VARIABLE] = '1'
170
171    with self._lock:
172      if self._terminated:
173        return 0
174      # Use os.nice(19) to ensure the lowest priority (idle) for these analysis
175      # tasks since we want to avoid slowing down the actual build.
176      # TODO(wnwen): Use ionice to reduce resource consumption.
177      TaskStats.add_process()
178      log(f'STARTING {self.name}')
179      # This use of preexec_fn is sufficiently simple, just one os.nice call.
180      # pylint: disable=subprocess-popen-preexec-fn
181      self._proc = subprocess.Popen(
182          self.cmd,
183          stdout=subprocess.PIPE,
184          stderr=subprocess.STDOUT,
185          cwd=self.cwd,
186          env=env,
187          text=True,
188          preexec_fn=lambda: os.nice(19),
189      )
190      self._thread = threading.Thread(
191          target=self._complete_when_process_finishes,
192          args=(on_complete_callback, ))
193      self._thread.start()
194      return 1
195
196  def terminate(self, replaced=False):
197    """Can be called multiple times to cancel and ignore the task's output."""
198
199    with self._lock:
200      if self._terminated:
201        return
202      self._terminated = True
203      self._replaced = replaced
204    # It is safe to access _proc and _thread outside of _lock since they are
205    # only changed by self.start holding _lock when self._terminate is false.
206    # Since we have just set self._terminate to true inside of _lock, we know
207    # that neither _proc nor _thread will be changed from this point onwards.
208    if self._proc:
209      self._proc.terminate()
210      self._proc.wait()
211    # Ensure that self._complete is called either by the thread or by us.
212    if self._thread:
213      self._thread.join()
214    else:
215      self._complete()
216
217  def _complete_when_process_finishes(self,
218                                      on_complete_callback: Callable[[], None]):
219    assert self._proc
220    # We know Popen.communicate will return a str and not a byte since it is
221    # constructed with text=True.
222    stdout: str = self._proc.communicate()[0]
223    self._return_code = self._proc.returncode
224    TaskStats.remove_process()
225    self._complete(stdout)
226    on_complete_callback()
227
228  def _complete(self, stdout: str = ''):
229    """Update the user and ninja after the task has run or been terminated.
230
231    This method should only be run once per task. Avoid modifying the task so
232    that this method does not need locking."""
233
234    TaskStats.complete_task()
235    delete_stamp = False
236    if self._terminated:
237      log(f'TERMINATED {self.name}')
238      # When tasks are replaced, avoid deleting the stamp file, context:
239      # https://issuetracker.google.com/301961827.
240      if not self._replaced:
241        delete_stamp = True
242    else:
243      log(f'FINISHED {self.name}')
244      if stdout or self._return_code != 0:
245        delete_stamp = True
246        # An extra new line is needed since we want to preserve the previous
247        # _log line. Use a single print so that it is threadsafe.
248        # TODO(wnwen): Improve stdout display by parsing over it and moving the
249        #              actual error to the bottom. Otherwise long command lines
250        #              in the Traceback section obscure the actual error(s).
251        print('\n' + '\n'.join([
252            f'FAILED: {self.name}',
253            f'Return code: {self._return_code}',
254            ' '.join(self.cmd),
255            stdout,
256        ]))
257
258    if delete_stamp:
259      # Force ninja to consider failed targets as dirty.
260      try:
261        os.unlink(os.path.join(self.cwd, self.stamp_file))
262      except FileNotFoundError:
263        pass
264    else:
265      # Ninja will rebuild targets when their inputs change even if their stamp
266      # file has a later modified time. Thus we do not need to worry about the
267      # script being run by the build server updating the mtime incorrectly.
268      pass
269
270
271def _listen_for_request_data(sock: socket.socket):
272  while True:
273    conn = sock.accept()[0]
274    received = []
275    with conn:
276      while True:
277        data = conn.recv(4096)
278        if not data:
279          break
280        received.append(data)
281    if received:
282      yield json.loads(b''.join(received))
283
284
285def _process_requests(sock: socket.socket):
286  # Since dicts in python can contain anything, explicitly type tasks to help
287  # make static type checking more useful.
288  tasks: Dict[Tuple[str, str], Task] = {}
289  task_manager = TaskManager()
290  try:
291    log('READY... Remember to set android_static_analysis="build_server" in '
292        'args.gn files')
293    for data in _listen_for_request_data(sock):
294      task = Task(name=data['name'],
295                  cwd=data['cwd'],
296                  cmd=data['cmd'],
297                  stamp_file=data['stamp_file'])
298      existing_task = tasks.get(task.key)
299      if existing_task:
300        existing_task.terminate(replaced=True)
301      tasks[task.key] = task
302      task_manager.add_task(task)
303  except KeyboardInterrupt:
304    log('STOPPING SERVER...', end='\n')
305    # Gracefully shut down the task manager, terminating all queued tasks.
306    task_manager.deactivate()
307    # Terminate all currently running tasks.
308    for task in tasks.values():
309      task.terminate()
310    log('STOPPED', end='\n')
311
312
313def main():
314  parser = argparse.ArgumentParser(description=__doc__)
315  parser.add_argument(
316      '--fail-if-not-running',
317      action='store_true',
318      help='Used by GN to fail fast if the build server is not running.')
319  args = parser.parse_args()
320  if args.fail_if_not_running:
321    with socket.socket(socket.AF_UNIX) as sock:
322      try:
323        sock.connect(server_utils.SOCKET_ADDRESS)
324      except socket.error:
325        print('Build server is not running and '
326              'android_static_analysis="build_server" is set.\nPlease run '
327              'this command in a separate terminal:\n\n'
328              '$ build/android/fast_local_dev_server.py\n')
329        return 1
330      else:
331        return 0
332  with socket.socket(socket.AF_UNIX) as sock:
333    sock.bind(server_utils.SOCKET_ADDRESS)
334    sock.listen()
335    _process_requests(sock)
336  return 0
337
338
339if __name__ == '__main__':
340  sys.exit(main())
341