• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2021 The Chromium Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""Creates an server to offload non-critical-path GN targets."""
6
7from __future__ import annotations
8
9import argparse
10import collections
11import contextlib
12import dataclasses
13import datetime
14import os
15import pathlib
16import re
17import signal
18import shlex
19import shutil
20import socket
21import subprocess
22import sys
23import threading
24import traceback
25import time
26from typing import Callable, Dict, List, Optional, Tuple, IO
27
28sys.path.append(os.path.join(os.path.dirname(__file__), 'gyp'))
29from util import server_utils
30
31_SOCKET_TIMEOUT = 60  # seconds
32
33_LOGFILE_NAME = 'buildserver.log'
34_MAX_LOGFILES = 6
35
36FIRST_LOG_LINE = """\
37#### Start of log for build: {build_id}
38#### CWD: {outdir}
39"""
40BUILD_ID_RE = re.compile(r'^#### Start of log for build: (?P<build_id>.+)')
41
42
43def server_log(msg: str):
44  if OptionsManager.is_quiet():
45    return
46  # Ensure we start our message on a new line.
47  print('\n' + msg)
48
49
50def print_status(prefix: str, msg: str):
51  # No need to also output to the terminal if quiet.
52  if OptionsManager.is_quiet():
53    return
54  # Shrink the message (leaving a 2-char prefix and use the rest of the room
55  # for the suffix) according to terminal size so it is always one line.
56  width = shutil.get_terminal_size().columns
57  max_msg_width = width - len(prefix)
58  if len(msg) > max_msg_width:
59    length_to_show = max_msg_width - 5  # Account for ellipsis and header.
60    msg = f'{msg[:2]}...{msg[-length_to_show:]}'
61  # \r to return the carriage to the beginning of line.
62  # \033[K to replace the normal \n to erase until the end of the line.
63  # Avoid the default line ending so the next \r overwrites the same line just
64  #     like ninja's output.
65  print(f'\r{prefix}{msg}\033[K', end='', flush=True)
66
67
68def _exception_hook(exctype: type, exc: Exception, tb):
69  # Let KeyboardInterrupt through.
70  if issubclass(exctype, KeyboardInterrupt):
71    sys.__excepthook__(exctype, exc, tb)
72    return
73  stacktrace = ''.join(traceback.format_exception(exctype, exc, tb))
74  stacktrace_lines = [f'\n⛔{line}' for line in stacktrace.splitlines()]
75  # Output uncaught exceptions to all live terminals
76  # Extra newline since siso's output often erases the current line.
77  BuildManager.broadcast(''.join(stacktrace_lines) + '\n')
78  # Cancel all pending tasks cleanly (i.e. delete stamp files if necessary).
79  TaskManager.deactivate()
80  # Reset all remote terminal titles.
81  BuildManager.update_remote_titles('')
82
83
84# Stores global options so as to not keep passing along and storing options
85# everywhere.
86class OptionsManager:
87  _options = None
88
89  @classmethod
90  def set_options(cls, options):
91    cls._options = options
92
93  @classmethod
94  def is_quiet(cls):
95    assert cls._options is not None
96    return cls._options.quiet
97
98  @classmethod
99  def should_remote_print(cls):
100    assert cls._options is not None
101    return not cls._options.no_remote_print
102
103
104class LogfileManager:
105  _logfiles: dict[str, IO[str]] = {}
106  _lock = threading.RLock()
107
108  @classmethod
109  def create_logfile(cls, build_id, outdir):
110    with cls._lock:
111      if logfile := cls._logfiles.get(build_id, None):
112        return logfile
113
114      outdir = pathlib.Path(outdir)
115      latest_logfile = outdir / f'{_LOGFILE_NAME}.0'
116
117      if latest_logfile.exists():
118        with latest_logfile.open('rt') as f:
119          first_line = f.readline()
120          if log_build_id := BUILD_ID_RE.search(first_line):
121            # If the newest logfile on disk is referencing the same build we are
122            # currently processing, we probably crashed previously and we should
123            # pick up where we left off in the same logfile.
124            if log_build_id.group('build_id') == build_id:
125              cls._logfiles[build_id] = latest_logfile.open('at')
126              return cls._logfiles[build_id]
127
128      # Do the logfile name shift.
129      filenames = os.listdir(outdir)
130      logfiles = {f for f in filenames if f.startswith(_LOGFILE_NAME)}
131      for idx in reversed(range(_MAX_LOGFILES)):
132        current_name = f'{_LOGFILE_NAME}.{idx}'
133        next_name = f'{_LOGFILE_NAME}.{idx+1}'
134        if current_name in logfiles:
135          shutil.move(os.path.join(outdir, current_name),
136                      os.path.join(outdir, next_name))
137
138      # Create a new 0th logfile.
139      logfile = latest_logfile.open('wt')
140      logfile.write(FIRST_LOG_LINE.format(build_id=build_id, outdir=outdir))
141      logfile.flush()
142      cls._logfiles[build_id] = logfile
143      return logfile
144
145
146class TaskStats:
147  """Class to keep track of aggregate stats for all tasks across threads."""
148  _num_processes = 0
149  _completed_tasks = 0
150  _total_tasks = 0
151  _lock = threading.RLock()
152
153  @classmethod
154  def no_running_processes(cls):
155    with cls._lock:
156      return cls._num_processes == 0
157
158  @classmethod
159  def add_task(cls):
160    with cls._lock:
161      cls._total_tasks += 1
162
163  @classmethod
164  def add_process(cls):
165    with cls._lock:
166      cls._num_processes += 1
167
168  @classmethod
169  def remove_process(cls):
170    with cls._lock:
171      cls._num_processes -= 1
172
173  @classmethod
174  def complete_task(cls):
175    with cls._lock:
176      cls._completed_tasks += 1
177
178  @classmethod
179  def num_pending_tasks(cls):
180    with cls._lock:
181      return cls._total_tasks - cls._completed_tasks
182
183  @classmethod
184  def num_completed_tasks(cls):
185    with cls._lock:
186      return cls._completed_tasks
187
188  @classmethod
189  def total_tasks(cls):
190    with cls._lock:
191      return cls._total_tasks
192
193  @classmethod
194  def get_title_message(cls):
195    with cls._lock:
196      return f'Analysis Steps: {cls._completed_tasks}/{cls._total_tasks}'
197
198  @classmethod
199  def query_build(cls, query_build_id: str = None):
200    builds = []
201    if query_build_id:
202      if build := BuildManager.get_build(query_build_id):
203        builds.append(build)
204    else:
205      builds = BuildManager.get_all_builds()
206    build_infos = []
207    for build in builds:
208      build_infos.append(build.query_build_info())
209    return {
210        'pid': os.getpid(),
211        'builds': build_infos,
212    }
213
214  @classmethod
215  def prefix(cls, build_id: str = None):
216    # Ninja's prefix is: [205 processes, 6/734 @ 6.5/s : 0.922s ]
217    # Time taken and task completion rate are not important for the build server
218    # since it is always running in the background and uses idle priority for
219    # its tasks.
220    with cls._lock:
221      if build_id:
222        build = BuildManager.get_build(build_id)
223        _num_processes = build.process_count()
224        _completed_tasks = build.completed_task_count()
225        _total_tasks = build.total_task_count()
226      else:
227        _num_processes = cls._num_processes
228        _completed_tasks = cls._completed_tasks
229        _total_tasks = cls._total_tasks
230      word = 'process' if _num_processes == 1 else 'processes'
231      return (f'{_num_processes} {word}, '
232              f'{_completed_tasks}/{_total_tasks}')
233
234
235def check_pid_alive(pid: int):
236  try:
237    os.kill(pid, 0)
238  except OSError:
239    return False
240  return True
241
242
243@dataclasses.dataclass
244class Build:
245  id: str
246  pid: int
247  env: dict
248  stdout: IO[str]
249  cwd: Optional[str] = None
250  _logfile: Optional[IO[str]] = None
251  _is_ninja_alive: bool = True
252  _tasks: List[Task] = dataclasses.field(default_factory=list)
253  _completed_task_count = 0
254  _active_process_count = 0
255  _lock: threading.RLock = dataclasses.field(default_factory=threading.RLock,
256                                             repr=False,
257                                             init=False)
258
259  def __hash__(self):
260    return hash((self.id, self.pid, self.cwd))
261
262  def add_task(self, task: Task):
263    self._status_update(f'QUEUED {task.name}')
264    with self._lock:
265      self._tasks.append(task)
266    TaskStats.add_task()
267    TaskManager.add_task(task)
268
269  def add_process(self, task: Task):
270    self._status_update(f'STARTING {task.name}')
271    with self._lock:
272      self._active_process_count += 1
273    TaskStats.add_process()
274
275  def task_done(self, task: Task, status_string: str):
276    self._status_update(f'{status_string} {task.name}')
277    TaskStats.complete_task()
278    TaskManager.task_done(task)
279    with self._lock:
280      self._completed_task_count += 1
281
282    # We synchronize all terminal title info rather than having it per build
283    # since if two builds are happening in the same terminal concurrently, both
284    # builds will be overriding each other's titles continuously. Usually we
285    # only have the one build anyways so it should equivalent in most cases.
286    BuildManager.update_remote_titles()
287    with self._lock:
288      if not self.is_active():
289        self._logfile.close()
290        # Reset in case its the last build.
291        BuildManager.update_remote_titles('')
292
293  def process_complete(self):
294    with self._lock:
295      self._active_process_count -= 1
296    TaskStats.remove_process()
297
298  def ensure_logfile(self):
299    with self._lock:
300      if not self._logfile:
301        assert self.cwd is not None
302        self._logfile = LogfileManager.create_logfile(self.id, self.cwd)
303
304  def log(self, message: str):
305    with self._lock:
306      self.ensure_logfile()
307      if self._logfile.closed:
308        # BuildManager#broadcast can call log after the build is done and the
309        # log is closed. Might make sense to separate out that flow so we can
310        # raise an exception here otherwise.
311        return
312      print(message, file=self._logfile, flush=True)
313
314  def _status_update(self, status_message):
315    prefix = f'[{TaskStats.prefix(self.id)}] '
316    self.log(f'{prefix}{status_message}')
317    print_status(prefix, status_message)
318
319  def total_task_count(self):
320    with self._lock:
321      return len(self._tasks)
322
323  def completed_task_count(self):
324    with self._lock:
325      return self._completed_task_count
326
327  def pending_task_count(self):
328    with self._lock:
329      return self.total_task_count() - self.completed_task_count()
330
331  def process_count(self):
332    with self._lock:
333      return self._active_process_count
334
335  def is_active(self):
336    if self.pending_task_count() > 0:
337      return True
338    # Ninja is not coming back to life so only check on it if last we checked it
339    # was still alive.
340    if self._is_ninja_alive:
341      self._is_ninja_alive = check_pid_alive(self.pid)
342    return self._is_ninja_alive
343
344  def query_build_info(self):
345    current_tasks = TaskManager.get_current_tasks(self.id)
346    return {
347        'build_id': self.id,
348        'is_active': self.is_active(),
349        'completed_tasks': self.completed_task_count(),
350        'pending_tasks': self.pending_task_count(),
351        'active_tasks': [t.cmd for t in current_tasks],
352        'outdir': self.cwd,
353    }
354
355
356class BuildManager:
357  _builds_by_id: dict[str, Build] = dict()
358  _cached_ttys: dict[(int, int), tuple[IO[str], bool]] = dict()
359  _lock = threading.RLock()
360
361  @classmethod
362  def register_builder(cls, env, pid, cwd):
363    build_id = env['AUTONINJA_BUILD_ID']
364    stdout = cls.open_tty(env['AUTONINJA_STDOUT_NAME'])
365    # Tells the script not to re-delegate to build server.
366    env[server_utils.BUILD_SERVER_ENV_VARIABLE] = '1'
367
368    with cls._lock:
369      build = Build(id=build_id,
370                    pid=pid,
371                    cwd=cwd,
372                    env=env,
373                    stdout=stdout)
374      cls.maybe_init_cwd(build, cwd)
375      cls._builds_by_id[build_id] = build
376    cls.update_remote_titles()
377
378  @classmethod
379  def maybe_init_cwd(cls, build: Build, cwd: str):
380    if cwd is not None:
381      with cls._lock:
382        if build.cwd is None:
383          build.cwd = cwd
384        else:
385          assert pathlib.Path(cwd).samefile(
386              build.cwd), f'{repr(cwd)} != {repr(build.cwd)}'
387        build.ensure_logfile()
388
389  @classmethod
390  def get_build(cls, build_id):
391    with cls._lock:
392      return cls._builds_by_id.get(build_id, None)
393
394  @classmethod
395  def open_tty(cls, tty_path):
396    # Do not open the same tty multiple times. Use st_ino and st_dev to compare
397    # file descriptors.
398    tty = open(tty_path, 'at')
399    st = os.stat(tty.fileno())
400    tty_key = (st.st_ino, st.st_dev)
401    with cls._lock:
402      # Dedupes ttys
403      if tty_key not in cls._cached_ttys:
404        # TTYs are kept open for the lifetime of the server so that broadcast
405        # messages (e.g. uncaught exceptions) can be sent to them even if they
406        # are not currently building anything.
407        cls._cached_ttys[tty_key] = (tty, tty.isatty())
408      else:
409        tty.close()
410      return cls._cached_ttys[tty_key][0]
411
412  @classmethod
413  def get_active_builds(cls) -> List[Build]:
414    builds = cls.get_all_builds()
415    return list(build for build in builds if build.is_active())
416
417  @classmethod
418  def get_all_builds(cls) -> List[Build]:
419    with cls._lock:
420      return list(cls._builds_by_id.values())
421
422  @classmethod
423  def broadcast(cls, msg: str):
424    with cls._lock:
425      ttys = list(cls._cached_ttys.values())
426      builds = list(cls._builds_by_id.values())
427    if OptionsManager.should_remote_print():
428      for tty, _unused in ttys:
429        try:
430          tty.write(msg + '\n')
431          tty.flush()
432        except BrokenPipeError:
433          pass
434    for build in builds:
435      build.log(msg)
436    # Write to the current terminal if we have not written to it yet.
437    st = os.stat(sys.stderr.fileno())
438    stderr_key = (st.st_ino, st.st_dev)
439    if stderr_key not in cls._cached_ttys:
440      print(msg, file=sys.stderr)
441
442  @classmethod
443  def update_remote_titles(cls, new_title=None):
444    if new_title is None:
445      if not cls.has_active_builds() and TaskStats.num_pending_tasks() == 0:
446        # Setting an empty title causes most terminals to go back to the
447        # default title (and at least prevents the tab title from being
448        # "Analysis Steps: N/N" forevermore.
449        new_title = ''
450      else:
451        new_title = TaskStats.get_title_message()
452
453    with cls._lock:
454      ttys = list(cls._cached_ttys.values())
455    for tty, isatty in ttys:
456      if isatty:
457        try:
458          tty.write(f'\033]2;{new_title}\007')
459          tty.flush()
460        except BrokenPipeError:
461          pass
462
463  @classmethod
464  def has_active_builds(cls):
465    return bool(cls.get_active_builds())
466
467
468class TaskManager:
469  """Class to encapsulate a threadsafe queue and handle deactivating it."""
470  _queue: collections.deque[Task] = collections.deque()
471  _current_tasks: set[Task] = set()
472  _deactivated = False
473  _lock = threading.RLock()
474
475  @classmethod
476  def add_task(cls, task: Task):
477    assert not cls._deactivated
478    with cls._lock:
479      cls._queue.appendleft(task)
480    cls._maybe_start_tasks()
481
482  @classmethod
483  def task_done(cls, task: Task):
484    with cls._lock:
485      cls._current_tasks.discard(task)
486
487  @classmethod
488  def get_current_tasks(cls, build_id):
489    with cls._lock:
490      return [t for t in cls._current_tasks if t.build.id == build_id]
491
492  @classmethod
493  def deactivate(cls):
494    cls._deactivated = True
495    tasks_to_terminate: list[Task] = []
496    with cls._lock:
497      while cls._queue:
498        task = cls._queue.pop()
499        tasks_to_terminate.append(task)
500      # Cancel possibly running tasks.
501      tasks_to_terminate.extend(cls._current_tasks)
502    # Terminate outside lock since task threads need the lock to finish
503    # terminating.
504    for task in tasks_to_terminate:
505      task.terminate()
506
507  @classmethod
508  def cancel_build(cls, build_id):
509    terminated_pending_tasks: list[Task] = []
510    terminated_current_tasks: list[Task] = []
511    with cls._lock:
512      # Cancel pending tasks.
513      for task in cls._queue:
514        if task.build.id == build_id:
515          terminated_pending_tasks.append(task)
516      for task in terminated_pending_tasks:
517        cls._queue.remove(task)
518      # Cancel running tasks.
519      for task in cls._current_tasks:
520        if task.build.id == build_id:
521          terminated_current_tasks.append(task)
522    # Terminate tasks outside lock since task threads need the lock to finish
523    # terminating.
524    for task in terminated_pending_tasks:
525      task.terminate()
526    for task in terminated_current_tasks:
527      task.terminate()
528
529  @staticmethod
530  # pylint: disable=inconsistent-return-statements
531  def _num_running_processes():
532    with open('/proc/stat') as f:
533      for line in f:
534        if line.startswith('procs_running'):
535          return int(line.rstrip().split()[1])
536    assert False, 'Could not read /proc/stat'
537
538  @classmethod
539  def _maybe_start_tasks(cls):
540    if cls._deactivated:
541      return
542    # Include load avg so that a small dip in the number of currently running
543    # processes will not cause new tasks to be started while the overall load is
544    # heavy.
545    cur_load = max(cls._num_running_processes(), os.getloadavg()[0])
546    num_started = 0
547    # Always start a task if we don't have any running, so that all tasks are
548    # eventually finished. Try starting up tasks when the overall load is light.
549    # Limit to at most 2 new tasks to prevent ramping up too fast. There is a
550    # chance where multiple threads call _maybe_start_tasks and each gets to
551    # spawn up to 2 new tasks, but since the only downside is some build tasks
552    # get worked on earlier rather than later, it is not worth mitigating.
553    while num_started < 2 and (TaskStats.no_running_processes()
554                               or num_started + cur_load < os.cpu_count()):
555      with cls._lock:
556        try:
557          next_task = cls._queue.pop()
558          cls._current_tasks.add(next_task)
559        except IndexError:
560          return
561      num_started += next_task.start(cls._maybe_start_tasks)
562
563
564# TODO(wnwen): Break this into Request (encapsulating what ninja sends) and Task
565#              when a Request starts to be run. This would eliminate ambiguity
566#              about when and whether _proc/_thread are initialized.
567class Task:
568  """Class to represent one task and operations on it."""
569
570  def __init__(self, name: str, build: Build, cmd: List[str], stamp_file: str):
571    self.name = name
572    self.build = build
573    self.cmd = cmd
574    self.stamp_file = stamp_file
575    self._terminated = False
576    self._replaced = False
577    self._lock = threading.RLock()
578    self._proc: Optional[subprocess.Popen] = None
579    self._thread: Optional[threading.Thread] = None
580    self._delete_stamp_thread: Optional[threading.Thread] = None
581    self._return_code: Optional[int] = None
582
583  @property
584  def key(self):
585    return (self.build.cwd, self.name)
586
587  def __hash__(self):
588    return hash((self.key, self.build.id))
589
590  def __eq__(self, other):
591    return self.key == other.key and self.build is other.build
592
593  def start(self, on_complete_callback: Callable[[], None]) -> int:
594    """Starts the task if it has not already been terminated.
595
596    Returns the number of processes that have been started. This is called at
597    most once when the task is popped off the task queue."""
598    with self._lock:
599      if self._terminated:
600        return 0
601
602      # Use os.nice(19) to ensure the lowest priority (idle) for these analysis
603      # tasks since we want to avoid slowing down the actual build.
604      # TODO(wnwen): Use ionice to reduce resource consumption.
605      self.build.add_process(self)
606      # This use of preexec_fn is sufficiently simple, just one os.nice call.
607      # pylint: disable=subprocess-popen-preexec-fn
608      self._proc = subprocess.Popen(
609          self.cmd,
610          stdout=subprocess.PIPE,
611          stderr=subprocess.STDOUT,
612          cwd=self.build.cwd,
613          env=self.build.env,
614          text=True,
615          preexec_fn=lambda: os.nice(19),
616      )
617      self._thread = threading.Thread(
618          target=self._complete_when_process_finishes,
619          args=(on_complete_callback, ))
620      self._thread.start()
621      return 1
622
623  def terminate(self, replaced=False):
624    """Can be called multiple times to cancel and ignore the task's output."""
625    with self._lock:
626      if self._terminated:
627        return
628      self._terminated = True
629      self._replaced = replaced
630
631    # It is safe to access _proc and _thread outside of _lock since they are
632    # only changed by self.start holding _lock when self._terminate is false.
633    # Since we have just set self._terminate to true inside of _lock, we know
634    # that neither _proc nor _thread will be changed from this point onwards.
635    if self._proc:
636      self._proc.terminate()
637      self._proc.wait()
638    # Ensure that self._complete is called either by the thread or by us.
639    if self._thread:
640      self._thread.join()
641    else:
642      self._complete()
643
644  def _complete_when_process_finishes(self,
645                                      on_complete_callback: Callable[[], None]):
646    assert self._proc
647    # We know Popen.communicate will return a str and not a byte since it is
648    # constructed with text=True.
649    stdout: str = self._proc.communicate()[0]
650    self._return_code = self._proc.returncode
651    self.build.process_complete()
652    self._complete(stdout)
653    on_complete_callback()
654
655  def _complete(self, stdout: str = ''):
656    """Update the user and ninja after the task has run or been terminated.
657
658    This method should only be run once per task. Avoid modifying the task so
659    that this method does not need locking."""
660
661    delete_stamp = False
662    status_string = 'FINISHED'
663    if self._terminated:
664      status_string = 'TERMINATED'
665      # When tasks are replaced, avoid deleting the stamp file, context:
666      # https://issuetracker.google.com/301961827.
667      if not self._replaced:
668        delete_stamp = True
669    elif stdout or self._return_code != 0:
670      status_string = 'FAILED'
671      delete_stamp = True
672      preamble = [
673          f'FAILED: {self.name}',
674          f'Return code: {self._return_code}',
675          'CMD: ' + shlex.join(self.cmd),
676          'STDOUT:',
677      ]
678
679      message = '\n'.join(preamble + [stdout])
680      self.build.log(message)
681      server_log(message)
682
683      if OptionsManager.should_remote_print():
684        # Add emoji to show that output is from the build server.
685        preamble = [f'⏩ {line}' for line in preamble]
686        remote_message = '\n'.join(preamble + [stdout])
687        # Add a new line at start of message to clearly delineate from previous
688        # output/text already on the remote tty we are printing to.
689        self.build.stdout.write(f'\n{remote_message}')
690        self.build.stdout.flush()
691    if delete_stamp:
692      # Force siso to consider failed targets as dirty.
693      try:
694        os.unlink(os.path.join(self.build.cwd, self.stamp_file))
695      except FileNotFoundError:
696        pass
697    self.build.task_done(self, status_string)
698
699
700def _handle_add_task(data, current_tasks: Dict[Tuple[str, str], Task]):
701  """Handle messages of type ADD_TASK."""
702  build_id = data['build_id']
703  build = BuildManager.get_build(build_id)
704  BuildManager.maybe_init_cwd(build, data.get('cwd'))
705
706  new_task = Task(name=data['name'],
707                  cmd=data['cmd'],
708                  build=build,
709                  stamp_file=data['stamp_file'])
710  existing_task = current_tasks.get(new_task.key)
711  if existing_task:
712    existing_task.terminate(replaced=True)
713  current_tasks[new_task.key] = new_task
714
715  build.add_task(new_task)
716
717
718def _handle_query_build(data, connection: socket.socket):
719  """Handle messages of type QUERY_BUILD."""
720  build_id = data['build_id']
721  response = TaskStats.query_build(build_id)
722  try:
723    with connection:
724      server_utils.SendMessage(connection, response)
725  except BrokenPipeError:
726    # We should not die because the client died.
727    pass
728
729
730def _handle_heartbeat(connection: socket.socket):
731  """Handle messages of type POLL_HEARTBEAT."""
732  try:
733    with connection:
734      server_utils.SendMessage(connection, {
735          'status': 'OK',
736          'pid': os.getpid(),
737      })
738  except BrokenPipeError:
739    # We should not die because the client died.
740    pass
741
742
743def _handle_register_builder(data):
744  """Handle messages of type REGISTER_BUILDER."""
745  env = data['env']
746  pid = int(data['builder_pid'])
747  cwd = data['cwd']
748
749  BuildManager.register_builder(env, pid, cwd)
750
751
752def _handle_cancel_build(data):
753  """Handle messages of type CANCEL_BUILD."""
754  build_id = data['build_id']
755  TaskManager.cancel_build(build_id)
756  BuildManager.update_remote_titles('')
757
758
759def _listen_for_request_data(sock: socket.socket):
760  """Helper to encapsulate getting a new message."""
761  while True:
762    conn = sock.accept()[0]
763    message = server_utils.ReceiveMessage(conn)
764    if message:
765      yield message, conn
766
767
768def _register_cleanup_signal_handlers():
769  original_sigint_handler = signal.getsignal(signal.SIGINT)
770  original_sigterm_handler = signal.getsignal(signal.SIGTERM)
771
772  def _cleanup(signum, frame):
773    server_log('STOPPING SERVER...')
774    # Gracefully shut down the task manager, terminating all queued tasks.
775    TaskManager.deactivate()
776    server_log('STOPPED')
777    if signum == signal.SIGINT:
778      if callable(original_sigint_handler):
779        original_sigint_handler(signum, frame)
780      else:
781        raise KeyboardInterrupt()
782    if signum == signal.SIGTERM:
783      # Sometimes sigterm handler is not a callable.
784      if callable(original_sigterm_handler):
785        original_sigterm_handler(signum, frame)
786      else:
787        sys.exit(1)
788
789  signal.signal(signal.SIGINT, _cleanup)
790  signal.signal(signal.SIGTERM, _cleanup)
791
792
793def _process_requests(sock: socket.socket, exit_on_idle: bool):
794  """Main loop for build server receiving request messages."""
795  # Since dicts in python can contain anything, explicitly type tasks to help
796  # make static type checking more useful.
797  tasks: Dict[Tuple[str, str], Task] = {}
798  server_log(
799      'READY... Remember to set android_static_analysis="build_server" in '
800      'args.gn files')
801  _register_cleanup_signal_handlers()
802  # pylint: disable=too-many-nested-blocks
803  while True:
804    try:
805      for data, connection in _listen_for_request_data(sock):
806        message_type = data.get('message_type', server_utils.ADD_TASK)
807        if message_type == server_utils.POLL_HEARTBEAT:
808          _handle_heartbeat(connection)
809        elif message_type == server_utils.ADD_TASK:
810          connection.close()
811          _handle_add_task(data, tasks)
812        elif message_type == server_utils.QUERY_BUILD:
813          _handle_query_build(data, connection)
814        elif message_type == server_utils.REGISTER_BUILDER:
815          connection.close()
816          _handle_register_builder(data)
817        elif message_type == server_utils.CANCEL_BUILD:
818          connection.close()
819          _handle_cancel_build(data)
820        else:
821          connection.close()
822    except TimeoutError:
823      # If we have not received a new task in a while and do not have any
824      # pending tasks or running builds, then exit. Otherwise keep waiting.
825      if (TaskStats.num_pending_tasks() == 0
826          and not BuildManager.has_active_builds() and exit_on_idle):
827        break
828    except KeyboardInterrupt:
829      break
830  BuildManager.update_remote_titles('')
831
832
833def query_build_info(build_id=None):
834  """Communicates with the main server to query build info."""
835  return _send_message_with_response({
836      'message_type': server_utils.QUERY_BUILD,
837      'build_id': build_id,
838  })
839
840
841def _wait_for_build(build_id):
842  """Comunicates with the main server waiting for a build to complete."""
843  start_time = datetime.datetime.now()
844  while True:
845    try:
846      build_info = query_build_info(build_id)['builds'][0]
847    except ConnectionRefusedError:
848      print('No server running. It likely finished all tasks.')
849      print('You can check $OUTDIR/buildserver.log.0 to be sure.')
850      return 0
851
852    pending_tasks = build_info['pending_tasks']
853
854    if pending_tasks == 0:
855      print(f'\nAll tasks completed for build_id: {build_id}.')
856      return 0
857
858    current_time = datetime.datetime.now()
859    duration = current_time - start_time
860    print(f'\rWaiting for {pending_tasks} tasks [{str(duration)}]\033[K',
861          end='',
862          flush=True)
863    time.sleep(1)
864
865
866def _wait_for_idle():
867  """Communicates with the main server waiting for all builds to complete."""
868  start_time = datetime.datetime.now()
869  while True:
870    try:
871      builds = query_build_info()['builds']
872    except ConnectionRefusedError:
873      print('No server running. It likely finished all tasks.')
874      print('You can check $OUTDIR/buildserver.log.0 to be sure.')
875      return 0
876
877    all_pending_tasks = 0
878    all_completed_tasks = 0
879    for build_info in builds:
880      pending_tasks = build_info['pending_tasks']
881      completed_tasks = build_info['completed_tasks']
882      active = build_info['is_active']
883      # Ignore completed builds.
884      if active or pending_tasks:
885        all_pending_tasks += pending_tasks
886        all_completed_tasks += completed_tasks
887    total_tasks = all_pending_tasks + all_completed_tasks
888
889    if all_pending_tasks == 0:
890      print('\nServer Idle, All tasks complete.')
891      return 0
892
893    current_time = datetime.datetime.now()
894    duration = current_time - start_time
895    print(
896        f'\rWaiting for {all_pending_tasks} remaining tasks. '
897        f'({all_completed_tasks}/{total_tasks} tasks complete) '
898        f'[{str(duration)}]\033[K',
899        end='',
900        flush=True)
901    time.sleep(0.5)
902
903
904def _check_if_running():
905  """Communicates with the main server to make sure its running."""
906  with socket.socket(socket.AF_UNIX) as sock:
907    try:
908      sock.connect(server_utils.SOCKET_ADDRESS)
909    except OSError:
910      print('Build server is not running and '
911            'android_static_analysis="build_server" is set.\nPlease run '
912            'this command in a separate terminal:\n\n'
913            '$ build/android/fast_local_dev_server.py\n')
914      return 1
915    else:
916      return 0
917
918
919def _send_message_and_close(message_dict):
920  with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock:
921    sock.connect(server_utils.SOCKET_ADDRESS)
922    sock.settimeout(1)
923    server_utils.SendMessage(sock, message_dict)
924
925
926def _send_message_with_response(message_dict):
927  with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock:
928    sock.connect(server_utils.SOCKET_ADDRESS)
929    sock.settimeout(1)
930    server_utils.SendMessage(sock, message_dict)
931    return server_utils.ReceiveMessage(sock)
932
933
934def _send_cancel_build(build_id):
935  _send_message_and_close({
936      'message_type': server_utils.CANCEL_BUILD,
937      'build_id': build_id,
938  })
939  return 0
940
941
942def _register_builder(build_id, builder_pid, output_directory):
943  if output_directory is not None:
944    output_directory = str(pathlib.Path(output_directory).absolute())
945  for _attempt in range(3):
946    try:
947      # Ensure environment variables that the server expects to be there are
948      # present.
949      server_utils.AssertEnvironmentVariables()
950
951      _send_message_and_close({
952          'message_type': server_utils.REGISTER_BUILDER,
953          'env': dict(os.environ),
954          'builder_pid': builder_pid,
955          'cwd': output_directory,
956      })
957      return 0
958    except OSError:
959      time.sleep(0.05)
960  print(f'Failed to register builer for build_id={build_id}.')
961  return 1
962
963
964def poll_server(retries=3):
965  """Communicates with the main server to query build info."""
966  for _attempt in range(retries):
967    try:
968      response = _send_message_with_response(
969          {'message_type': server_utils.POLL_HEARTBEAT})
970      if response:
971        break
972    except OSError:
973      time.sleep(0.05)
974  else:
975    return None
976  return response['pid']
977
978
979def _print_build_status_all():
980  try:
981    query_data = query_build_info(None)
982  except ConnectionRefusedError:
983    print('No server running. Consult $OUTDIR/buildserver.log.0')
984    return 0
985  builds = query_data['builds']
986  pid = query_data['pid']
987  all_active_tasks = []
988  print(f'Build server (PID={pid}) has {len(builds)} registered builds')
989  for build_info in builds:
990    build_id = build_info['build_id']
991    pending_tasks = build_info['pending_tasks']
992    completed_tasks = build_info['completed_tasks']
993    active_tasks = build_info['active_tasks']
994    out_dir = build_info['outdir']
995    active = build_info['is_active']
996    total_tasks = pending_tasks + completed_tasks
997    all_active_tasks += active_tasks
998    if total_tasks == 0 and not active:
999      status = 'Finished without any jobs'
1000    else:
1001      if active:
1002        status = 'Siso still running'
1003      else:
1004        status = 'Siso finished'
1005      if out_dir:
1006        status += f' in {out_dir}'
1007      status += f'. Completed [{completed_tasks}/{total_tasks}].'
1008      if completed_tasks < total_tasks:
1009        status += f' {len(active_tasks)} task(s) currently executing'
1010    print(f'{build_id}: {status}')
1011    if all_active_tasks:
1012      total = len(all_active_tasks)
1013      to_show = min(4, total)
1014      print(f'Currently executing (showing {to_show} of {total}):')
1015      for cmd in sorted(all_active_tasks)[:to_show]:
1016        truncated = shlex.join(cmd)
1017        if len(truncated) > 200:
1018          truncated = truncated[:200] + '...'
1019        print(truncated)
1020  return 0
1021
1022
1023def _print_build_status(build_id):
1024  server_path = os.path.relpath(str(server_utils.SERVER_SCRIPT))
1025  try:
1026    builds = query_build_info(build_id)['builds']
1027    if not builds:
1028      print(f'⚠️ No build found with id ({build_id})')
1029      print('⚠️ To see the status of all builds:',
1030            shlex.join([server_path, '--print-status-all']))
1031      return 1
1032    build_info = builds[0]
1033  except ConnectionRefusedError:
1034    print('⚠️ No server running. Consult $OUTDIR/buildserver.log.0')
1035    return 0
1036  pending_tasks = build_info['pending_tasks']
1037
1038  # Print nothing unless there are still pending tasks
1039  if pending_tasks:
1040    is_str = 'is' if pending_tasks == 1 else 'are'
1041    job_str = 'job' if pending_tasks == 1 else 'jobs'
1042    print(f'⏩ There {is_str} still {pending_tasks} static analysis {job_str}'
1043          ' running in the background.')
1044    print('⏩ To wait for them:', shlex.join([server_path, '--wait-for-idle']))
1045  return 0
1046
1047
1048def _wait_for_task_requests(exit_on_idle):
1049  with socket.socket(socket.AF_UNIX) as sock:
1050    sock.settimeout(_SOCKET_TIMEOUT)
1051    try:
1052      sock.bind(server_utils.SOCKET_ADDRESS)
1053    except OSError as e:
1054      # errno 98 is Address already in use
1055      if e.errno == 98:
1056        if not OptionsManager.is_quiet():
1057          pid = poll_server()
1058          print(f'Another instance is already running (pid: {pid}).',
1059                file=sys.stderr)
1060        return 1
1061      raise
1062    sock.listen()
1063    _process_requests(sock, exit_on_idle)
1064  return 0
1065
1066
1067def main():
1068  # pylint: disable=too-many-return-statements
1069  parser = argparse.ArgumentParser(description=__doc__)
1070  parser.add_argument(
1071      '--fail-if-not-running',
1072      action='store_true',
1073      help='Used by GN to fail fast if the build server is not running.')
1074  parser.add_argument(
1075      '--exit-on-idle',
1076      action='store_true',
1077      help='Server started on demand. Exit when all tasks run out.')
1078  parser.add_argument('--quiet',
1079                      action='store_true',
1080                      help='Do not output status updates.')
1081  parser.add_argument('--no-remote-print',
1082                      action='store_true',
1083                      help='Do not output errors to remote terminals.')
1084  parser.add_argument('--wait-for-build',
1085                      metavar='BUILD_ID',
1086                      help='Wait for build server to finish with all tasks '
1087                      'for BUILD_ID and output any pending messages.')
1088  parser.add_argument('--wait-for-idle',
1089                      action='store_true',
1090                      help='Wait for build server to finish with all '
1091                      'pending tasks.')
1092  parser.add_argument('--print-status',
1093                      metavar='BUILD_ID',
1094                      help='Print the current state of a build.')
1095  parser.add_argument('--print-status-all',
1096                      action='store_true',
1097                      help='Print the current state of all active builds.')
1098  parser.add_argument(
1099      '--register-build-id',
1100      metavar='BUILD_ID',
1101      help='Inform the build server that a new build has started.')
1102  parser.add_argument('--output-directory',
1103                      help='Build directory (use with --register-build-id)')
1104  parser.add_argument('--builder-pid',
1105                      help='Builder process\'s pid for build BUILD_ID.')
1106  parser.add_argument('--cancel-build',
1107                      metavar='BUILD_ID',
1108                      help='Cancel all pending and running tasks for BUILD_ID.')
1109  args = parser.parse_args()
1110  OptionsManager.set_options(args)
1111
1112  if args.fail_if_not_running:
1113    return _check_if_running()
1114  if args.wait_for_build:
1115    return _wait_for_build(args.wait_for_build)
1116  if args.wait_for_idle:
1117    return _wait_for_idle()
1118  if args.print_status:
1119    return _print_build_status(args.print_status)
1120  if args.print_status_all:
1121    return _print_build_status_all()
1122  if args.register_build_id:
1123    return _register_builder(args.register_build_id, args.builder_pid,
1124                             args.output_directory)
1125  if args.cancel_build:
1126    return _send_cancel_build(args.cancel_build)
1127  return _wait_for_task_requests(args.exit_on_idle)
1128
1129
1130if __name__ == '__main__':
1131  sys.excepthook = _exception_hook
1132  sys.exit(main())
1133