• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2021 The Chromium Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""Creates an server to offload non-critical-path GN targets."""
6
7from __future__ import annotations
8
9import argparse
10import collections
11import contextlib
12import datetime
13import json
14import os
15import pathlib
16import re
17import shutil
18import socket
19import subprocess
20import sys
21import threading
22import traceback
23import time
24from typing import Callable, Dict, List, Optional, Tuple, IO
25
26sys.path.append(os.path.join(os.path.dirname(__file__), 'gyp'))
27from util import server_utils
28
29_SOCKET_TIMEOUT = 300  # seconds
30
31_LOGFILES = {}
32_LOGFILE_NAME = 'buildserver.log'
33_MAX_LOGFILES = 6
34
35FIRST_LOG_LINE = '#### Start of log for build_id = {build_id} ####\n'
36BUILD_ID_RE = re.compile(r'^#### .*build_id = (?P<build_id>.+) ####')
37
38
39def log(msg: str, quiet: bool = False):
40  if quiet:
41    return
42  # Ensure we start our message on a new line.
43  print('\n' + msg)
44
45
46def set_status(msg: str, *, quiet: bool = False, build_id: str = None):
47  prefix = f'[{TaskStats.prefix()}] '
48  # if message is specific to a build then also output to its logfile.
49  if build_id:
50    log_to_file(f'{prefix}{msg}', build_id=build_id)
51
52  # No need to also output to the terminal if quiet.
53  if quiet:
54    return
55  # Shrink the message (leaving a 2-char prefix and use the rest of the room
56  # for the suffix) according to terminal size so it is always one line.
57  width = shutil.get_terminal_size().columns
58  max_msg_width = width - len(prefix)
59  if len(msg) > max_msg_width:
60    length_to_show = max_msg_width - 5  # Account for ellipsis and header.
61    msg = f'{msg[:2]}...{msg[-length_to_show:]}'
62  # \r to return the carriage to the beginning of line.
63  # \033[K to replace the normal \n to erase until the end of the line.
64  # Avoid the default line ending so the next \r overwrites the same line just
65  #     like ninja's output.
66  print(f'\r{prefix}{msg}\033[K', end='', flush=True)
67
68
69def log_to_file(message: str, build_id: str):
70  logfile = _LOGFILES.get(build_id)
71  print(message, file=logfile, flush=True)
72
73
74def _exception_hook(exctype: type, exc: Exception, tb):
75  # Output uncaught exceptions to all live terminals
76  BuildManager.broadcast(''.join(traceback.format_exception(exctype, exc, tb)))
77  # Cancel all pending tasks cleanly (i.e. delete stamp files if necessary).
78  TaskManager.deactivate()
79  sys.__excepthook__(exctype, exc, tb)
80
81
82def create_logfile(build_id, outdir):
83  if logfile := _LOGFILES.get(build_id, None):
84    return logfile
85
86  outdir = pathlib.Path(outdir)
87  latest_logfile = outdir / f'{_LOGFILE_NAME}.0'
88
89  if latest_logfile.exists():
90    with latest_logfile.open('rt') as f:
91      first_line = f.readline()
92      if log_build_id := BUILD_ID_RE.search(first_line):
93        # If the newest logfile on disk is referencing the same build we are
94        # currently processing, we probably crashed previously and we should
95        # pick up where we left off in the same logfile.
96        if log_build_id.group('build_id') == build_id:
97          _LOGFILES[build_id] = latest_logfile.open('at')
98          return _LOGFILES[build_id]
99
100  # Do the logfile name shift.
101  filenames = os.listdir(outdir)
102  logfiles = {f for f in filenames if f.startswith(_LOGFILE_NAME)}
103  for idx in reversed(range(_MAX_LOGFILES)):
104    current_name = f'{_LOGFILE_NAME}.{idx}'
105    next_name = f'{_LOGFILE_NAME}.{idx+1}'
106    if current_name in logfiles:
107      shutil.move(os.path.join(outdir, current_name),
108                  os.path.join(outdir, next_name))
109
110  # Create a new 0th logfile.
111  logfile = latest_logfile.open('wt')
112  _LOGFILES[build_id] = logfile
113  logfile.write(FIRST_LOG_LINE.format(build_id=build_id))
114  logfile.flush()
115  return logfile
116
117
118class TaskStats:
119  """Class to keep track of aggregate stats for all tasks across threads."""
120  _num_processes = 0
121  _completed_tasks = 0
122  _total_tasks = 0
123  _total_task_count_per_build = collections.defaultdict(int)
124  _completed_task_count_per_build = collections.defaultdict(int)
125  _running_processes_count_per_build = collections.defaultdict(int)
126  _lock = threading.Lock()
127
128  @classmethod
129  def no_running_processes(cls):
130    with cls._lock:
131      return cls._num_processes == 0
132
133  @classmethod
134  def add_task(cls, build_id: str):
135    with cls._lock:
136      cls._total_tasks += 1
137      cls._total_task_count_per_build[build_id] += 1
138
139  @classmethod
140  def add_process(cls, build_id: str):
141    with cls._lock:
142      cls._num_processes += 1
143      cls._running_processes_count_per_build[build_id] += 1
144
145  @classmethod
146  def remove_process(cls, build_id: str):
147    with cls._lock:
148      cls._num_processes -= 1
149      cls._running_processes_count_per_build[build_id] -= 1
150
151  @classmethod
152  def complete_task(cls, build_id: str):
153    with cls._lock:
154      cls._completed_tasks += 1
155      cls._completed_task_count_per_build[build_id] += 1
156
157  @classmethod
158  def num_pending_tasks(cls, build_id: str = None):
159    with cls._lock:
160      if build_id:
161        return cls._total_task_count_per_build[
162            build_id] - cls._completed_task_count_per_build[build_id]
163      return cls._total_tasks - cls._completed_tasks
164
165  @classmethod
166  def num_completed_tasks(cls, build_id: str = None):
167    with cls._lock:
168      if build_id:
169        return cls._completed_task_count_per_build[build_id]
170      return cls._completed_tasks
171
172  @classmethod
173  def prefix(cls, build_id: str = None):
174    # Ninja's prefix is: [205 processes, 6/734 @ 6.5/s : 0.922s ]
175    # Time taken and task completion rate are not important for the build server
176    # since it is always running in the background and uses idle priority for
177    # its tasks.
178    with cls._lock:
179      if build_id:
180        _num_processes = cls._running_processes_count_per_build[build_id]
181        _completed_tasks = cls._completed_task_count_per_build[build_id]
182        _total_tasks = cls._total_task_count_per_build[build_id]
183      else:
184        _num_processes = cls._num_processes
185        _completed_tasks = cls._completed_tasks
186        _total_tasks = cls._total_tasks
187      word = 'process' if _num_processes == 1 else 'processes'
188      return (f'{_num_processes} {word}, '
189              f'{_completed_tasks}/{_total_tasks}')
190
191
192def check_pid_alive(pid: int):
193  try:
194    os.kill(pid, 0)
195  except OSError:
196    return False
197  return True
198
199
200class BuildManager:
201  _live_builders: dict[str, int] = dict()
202  _build_ttys: dict[str, IO[str]] = dict()
203  _lock = threading.RLock()
204
205  @classmethod
206  def register_builder(cls, build_id, builder_pid):
207    with cls._lock:
208      cls._live_builders[build_id] = int(builder_pid)
209
210  @classmethod
211  def register_tty(cls, build_id, tty):
212    with cls._lock:
213      cls._build_ttys[build_id] = tty
214
215  @classmethod
216  def get_live_builds(cls):
217    with cls._lock:
218      for build_id, builder_pid in list(cls._live_builders.items()):
219        if not check_pid_alive(builder_pid):
220          del cls._live_builders[build_id]
221      return list(cls._live_builders.keys())
222
223  @classmethod
224  def broadcast(cls, msg: str):
225    seen = set()
226    with cls._lock:
227      for tty in cls._build_ttys.values():
228        # Do not output to the same tty multiple times. Use st_ino and st_dev to
229        # compare open file descriptors.
230        st = os.stat(tty.fileno())
231        key = (st.st_ino, st.st_dev)
232        if key in seen:
233          continue
234        seen.add(key)
235        try:
236          tty.write(msg + '\n')
237          tty.flush()
238        except BrokenPipeError:
239          pass
240
241  @classmethod
242  def has_live_builds(cls):
243    return bool(cls.get_live_builds())
244
245
246class TaskManager:
247  """Class to encapsulate a threadsafe queue and handle deactivating it."""
248  _queue: collections.deque[Task] = collections.deque()
249  _deactivated = False
250  _lock = threading.RLock()
251
252  @classmethod
253  def add_task(cls, task: Task, options):
254    assert not cls._deactivated
255    TaskStats.add_task(build_id=task.build_id)
256    with cls._lock:
257      cls._queue.appendleft(task)
258    set_status(f'QUEUED {task.name}',
259               quiet=options.quiet,
260               build_id=task.build_id)
261    cls._maybe_start_tasks()
262
263  @classmethod
264  def deactivate(cls):
265    cls._deactivated = True
266    with cls._lock:
267      while cls._queue:
268        task = cls._queue.pop()
269        task.terminate()
270
271  @classmethod
272  def cancel_build(cls, build_id):
273    terminated_tasks = []
274    with cls._lock:
275      for task in cls._queue:
276        if task.build_id == build_id:
277          task.terminate()
278          terminated_tasks.append(task)
279      for task in terminated_tasks:
280        cls._queue.remove(task)
281
282  @staticmethod
283  # pylint: disable=inconsistent-return-statements
284  def _num_running_processes():
285    with open('/proc/stat') as f:
286      for line in f:
287        if line.startswith('procs_running'):
288          return int(line.rstrip().split()[1])
289    assert False, 'Could not read /proc/stat'
290
291  @classmethod
292  def _maybe_start_tasks(cls):
293    if cls._deactivated:
294      return
295    # Include load avg so that a small dip in the number of currently running
296    # processes will not cause new tasks to be started while the overall load is
297    # heavy.
298    cur_load = max(cls._num_running_processes(), os.getloadavg()[0])
299    num_started = 0
300    # Always start a task if we don't have any running, so that all tasks are
301    # eventually finished. Try starting up tasks when the overall load is light.
302    # Limit to at most 2 new tasks to prevent ramping up too fast. There is a
303    # chance where multiple threads call _maybe_start_tasks and each gets to
304    # spawn up to 2 new tasks, but since the only downside is some build tasks
305    # get worked on earlier rather than later, it is not worth mitigating.
306    while num_started < 2 and (TaskStats.no_running_processes()
307                               or num_started + cur_load < os.cpu_count()):
308      with cls._lock:
309        try:
310          next_task = cls._queue.pop()
311        except IndexError:
312          return
313      num_started += next_task.start(cls._maybe_start_tasks)
314
315
316# TODO(wnwen): Break this into Request (encapsulating what ninja sends) and Task
317#              when a Request starts to be run. This would eliminate ambiguity
318#              about when and whether _proc/_thread are initialized.
319class Task:
320  """Class to represent one task and operations on it."""
321
322  def __init__(self, name: str, cwd: str, cmd: List[str], tty: IO[str],
323               stamp_file: str, build_id: str, remote_print: bool, options):
324    self.name = name
325    self.cwd = cwd
326    self.cmd = cmd
327    self.stamp_file = stamp_file
328    self.tty = tty
329    self.build_id = build_id
330    self.remote_print = remote_print
331    self.options = options
332    self._terminated = False
333    self._replaced = False
334    self._lock = threading.RLock()
335    self._proc: Optional[subprocess.Popen] = None
336    self._thread: Optional[threading.Thread] = None
337    self._delete_stamp_thread: Optional[threading.Thread] = None
338    self._return_code: Optional[int] = None
339
340  @property
341  def key(self):
342    return (self.cwd, self.name)
343
344  def __eq__(self, other):
345    return self.key == other.key and self.build_id == other.build_id
346
347  def start(self, on_complete_callback: Callable[[], None]) -> int:
348    """Starts the task if it has not already been terminated.
349
350    Returns the number of processes that have been started. This is called at
351    most once when the task is popped off the task queue."""
352
353    # The environment variable forces the script to actually run in order to
354    # avoid infinite recursion.
355    env = os.environ.copy()
356    env[server_utils.BUILD_SERVER_ENV_VARIABLE] = '1'
357
358    with self._lock:
359      if self._terminated:
360        return 0
361
362      # Use os.nice(19) to ensure the lowest priority (idle) for these analysis
363      # tasks since we want to avoid slowing down the actual build.
364      # TODO(wnwen): Use ionice to reduce resource consumption.
365      TaskStats.add_process(self.build_id)
366      set_status(f'STARTING {self.name}',
367                 quiet=self.options.quiet,
368                 build_id=self.build_id)
369      # This use of preexec_fn is sufficiently simple, just one os.nice call.
370      # pylint: disable=subprocess-popen-preexec-fn
371      self._proc = subprocess.Popen(
372          self.cmd,
373          stdout=subprocess.PIPE,
374          stderr=subprocess.STDOUT,
375          cwd=self.cwd,
376          env=env,
377          text=True,
378          preexec_fn=lambda: os.nice(19),
379      )
380      self._thread = threading.Thread(
381          target=self._complete_when_process_finishes,
382          args=(on_complete_callback, ))
383      self._thread.start()
384      return 1
385
386  def terminate(self, replaced=False):
387    """Can be called multiple times to cancel and ignore the task's output."""
388
389    with self._lock:
390      if self._terminated:
391        return
392      self._terminated = True
393      self._replaced = replaced
394
395    # It is safe to access _proc and _thread outside of _lock since they are
396    # only changed by self.start holding _lock when self._terminate is false.
397    # Since we have just set self._terminate to true inside of _lock, we know
398    # that neither _proc nor _thread will be changed from this point onwards.
399    if self._proc:
400      self._proc.terminate()
401      self._proc.wait()
402    # Ensure that self._complete is called either by the thread or by us.
403    if self._thread:
404      self._thread.join()
405    else:
406      self._complete()
407
408  def _complete_when_process_finishes(self,
409                                      on_complete_callback: Callable[[], None]):
410    assert self._proc
411    # We know Popen.communicate will return a str and not a byte since it is
412    # constructed with text=True.
413    stdout: str = self._proc.communicate()[0]
414    self._return_code = self._proc.returncode
415    TaskStats.remove_process(build_id=self.build_id)
416    self._complete(stdout)
417    on_complete_callback()
418
419  def _complete(self, stdout: str = ''):
420    """Update the user and ninja after the task has run or been terminated.
421
422    This method should only be run once per task. Avoid modifying the task so
423    that this method does not need locking."""
424
425    TaskStats.complete_task(build_id=self.build_id)
426    delete_stamp = False
427    status_string = 'FINISHED'
428    if self._terminated:
429      status_string = 'TERMINATED'
430      # When tasks are replaced, avoid deleting the stamp file, context:
431      # https://issuetracker.google.com/301961827.
432      if not self._replaced:
433        delete_stamp = True
434    elif stdout or self._return_code != 0:
435      status_string = 'FAILED'
436      delete_stamp = True
437      preamble = [
438          f'FAILED: {self.name}',
439          f'Return code: {self._return_code}',
440          'CMD: ' + ' '.join(self.cmd),
441          'STDOUT:',
442      ]
443
444      message = '\n'.join(preamble + [stdout])
445      log_to_file(message, build_id=self.build_id)
446      log(message, quiet=self.options.quiet)
447      if self.remote_print:
448        # Add emoji to show that output is from the build server.
449        preamble = [f'⏩ {line}' for line in preamble]
450        self.tty.write('\n'.join(preamble + [stdout]))
451        self.tty.flush()
452    set_status(f'{status_string} {self.name}',
453               quiet=self.options.quiet,
454               build_id=self.build_id)
455    if delete_stamp:
456      # Force siso to consider failed targets as dirty.
457      try:
458        os.unlink(os.path.join(self.cwd, self.stamp_file))
459      except FileNotFoundError:
460        pass
461    else:
462      # We do not care about the action writing a too new mtime. Siso only cares
463      # about the mtime that is recorded in its database at the time the
464      # original action finished.
465      pass
466
467
468def _handle_add_task(data, current_tasks: Dict[Tuple[str, str], Task], options):
469  """Handle messages of type ADD_TASK."""
470  build_id = data['build_id']
471  task_outdir = data['cwd']
472
473  is_experimental = data.get('experimental', False)
474  tty = None
475  if is_experimental:
476    tty = open(data['tty'], 'wt')
477    BuildManager.register_tty(build_id, tty)
478
479  # Make sure a logfile for the build_id exists.
480  create_logfile(build_id, task_outdir)
481
482  new_task = Task(name=data['name'],
483                  cwd=task_outdir,
484                  cmd=data['cmd'],
485                  tty=tty,
486                  build_id=build_id,
487                  remote_print=is_experimental,
488                  stamp_file=data['stamp_file'],
489                  options=options)
490  existing_task = current_tasks.get(new_task.key)
491  if existing_task:
492    existing_task.terminate(replaced=True)
493  current_tasks[new_task.key] = new_task
494
495  TaskManager.add_task(new_task, options)
496
497
498def _handle_query_build(data, connection: socket.socket):
499  """Handle messages of type QUERY_BUILD."""
500  build_id = data['build_id']
501  pending_tasks = TaskStats.num_pending_tasks(build_id)
502  completed_tasks = TaskStats.num_completed_tasks(build_id)
503  response = {
504      'build_id': build_id,
505      'completed_tasks': completed_tasks,
506      'pending_tasks': pending_tasks,
507  }
508  try:
509    with connection:
510      server_utils.SendMessage(connection, json.dumps(response).encode('utf8'))
511  except BrokenPipeError:
512    # We should not die because the client died.
513    pass
514
515
516def _handle_heartbeat(connection: socket.socket):
517  """Handle messages of type POLL_HEARTBEAT."""
518  try:
519    with connection:
520      server_utils.SendMessage(connection,
521                               json.dumps({
522                                   'status': 'OK'
523                               }).encode('utf8'))
524  except BrokenPipeError:
525    # We should not die because the client died.
526    pass
527
528
529def _handle_register_builder(data):
530  """Handle messages of type REGISTER_BUILDER."""
531  build_id = data['build_id']
532  builder_pid = data['builder_pid']
533  BuildManager.register_builder(build_id, builder_pid)
534
535
536def _handle_cancel_build(data):
537  """Handle messages of type CANCEL_BUILD."""
538  build_id = data['build_id']
539  TaskManager.cancel_build(build_id)
540
541
542def _listen_for_request_data(sock: socket.socket):
543  """Helper to encapsulate getting a new message."""
544  while True:
545    conn = sock.accept()[0]
546    message_bytes = server_utils.ReceiveMessage(conn)
547    if message_bytes:
548      yield json.loads(message_bytes), conn
549
550
551def _process_requests(sock: socket.socket, options):
552  """Main loop for build server receiving request messages."""
553  # Since dicts in python can contain anything, explicitly type tasks to help
554  # make static type checking more useful.
555  tasks: Dict[Tuple[str, str], Task] = {}
556  log(
557      'READY... Remember to set android_static_analysis="build_server" in '
558      'args.gn files',
559      quiet=options.quiet)
560  # pylint: disable=too-many-nested-blocks
561  try:
562    while True:
563      try:
564        for data, connection in _listen_for_request_data(sock):
565          message_type = data.get('message_type', server_utils.ADD_TASK)
566          if message_type == server_utils.POLL_HEARTBEAT:
567            _handle_heartbeat(connection)
568          if message_type == server_utils.ADD_TASK:
569            connection.close()
570            _handle_add_task(data, tasks, options)
571          if message_type == server_utils.QUERY_BUILD:
572            _handle_query_build(data, connection)
573          if message_type == server_utils.REGISTER_BUILDER:
574            connection.close()
575            _handle_register_builder(data)
576          if message_type == server_utils.CANCEL_BUILD:
577            connection.close()
578            _handle_cancel_build(data)
579      except TimeoutError:
580        # If we have not received a new task in a while and do not have any
581        # pending tasks or running builds, then exit. Otherwise keep waiting.
582        if (TaskStats.num_pending_tasks() == 0
583            and not BuildManager.has_live_builds() and options.exit_on_idle):
584          break
585      except KeyboardInterrupt:
586        break
587  finally:
588    log('STOPPING SERVER...', quiet=options.quiet)
589    # Gracefully shut down the task manager, terminating all queued tasks.
590    TaskManager.deactivate()
591    # Terminate all currently running tasks.
592    for task in tasks.values():
593      task.terminate()
594    log('STOPPED', quiet=options.quiet)
595
596
597def query_build_info(build_id):
598  """Communicates with the main server to query build info."""
599  with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock:
600    sock.connect(server_utils.SOCKET_ADDRESS)
601    sock.settimeout(3)
602    server_utils.SendMessage(
603        sock,
604        json.dumps({
605            'message_type': server_utils.QUERY_BUILD,
606            'build_id': build_id,
607        }).encode('utf8'))
608    response_bytes = server_utils.ReceiveMessage(sock)
609    return json.loads(response_bytes)
610
611
612def _wait_for_build(build_id):
613  """Comunicates with the main server waiting for a build to complete."""
614  start_time = datetime.datetime.now()
615  while True:
616    build_info = query_build_info(build_id)
617    pending_tasks = build_info['pending_tasks']
618
619    if pending_tasks == 0:
620      print(f'\nAll tasks completed for build_id: {build_id}.')
621      return 0
622
623    current_time = datetime.datetime.now()
624    duration = current_time - start_time
625    print(f'\rWaiting for {pending_tasks} tasks [{str(duration)}]\033[K',
626          end='',
627          flush=True)
628    time.sleep(1)
629
630
631def _check_if_running():
632  """Communicates with the main server to make sure its running."""
633  with socket.socket(socket.AF_UNIX) as sock:
634    try:
635      sock.connect(server_utils.SOCKET_ADDRESS)
636    except socket.error:
637      print('Build server is not running and '
638            'android_static_analysis="build_server" is set.\nPlease run '
639            'this command in a separate terminal:\n\n'
640            '$ build/android/fast_local_dev_server.py\n')
641      return 1
642    else:
643      return 0
644
645
646def _send_message_and_close(message_dict):
647  with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock:
648    sock.connect(server_utils.SOCKET_ADDRESS)
649    sock.settimeout(3)
650    server_utils.SendMessage(sock, json.dumps(message_dict).encode('utf8'))
651
652
653def _send_cancel_build(build_id):
654  _send_message_and_close({
655      'message_type': server_utils.CANCEL_BUILD,
656      'build_id': build_id,
657  })
658  return 0
659
660
661def _register_builder(build_id, builder_pid):
662  for _attempt in range(3):
663    try:
664      _send_message_and_close({
665          'message_type': server_utils.REGISTER_BUILDER,
666          'build_id': build_id,
667          'builder_pid': builder_pid,
668      })
669      return 0
670    except socket.error:
671      time.sleep(0.05)
672  print(f'Failed to register builer for build_id={build_id}.')
673  return 1
674
675
676def _wait_for_task_requests(args):
677  with socket.socket(socket.AF_UNIX) as sock:
678    sock.settimeout(_SOCKET_TIMEOUT)
679    try:
680      sock.bind(server_utils.SOCKET_ADDRESS)
681    except socket.error as e:
682      # errno 98 is Address already in use
683      if e.errno == 98:
684        print('fast_local_dev_server.py is already running.')
685        return 1
686      raise
687    sock.listen()
688    _process_requests(sock, args)
689  return 0
690
691
692def main():
693  parser = argparse.ArgumentParser(description=__doc__)
694  parser.add_argument(
695      '--fail-if-not-running',
696      action='store_true',
697      help='Used by GN to fail fast if the build server is not running.')
698  parser.add_argument(
699      '--exit-on-idle',
700      action='store_true',
701      help='Server started on demand. Exit when all tasks run out.')
702  parser.add_argument('--quiet',
703                      action='store_true',
704                      help='Do not output status updates.')
705  parser.add_argument('--wait-for-build',
706                      metavar='BUILD_ID',
707                      help='Wait for build server to finish with all tasks '
708                      'for BUILD_ID and output any pending messages.')
709  parser.add_argument(
710      '--register-build-id',
711      metavar='BUILD_ID',
712      help='Inform the build server that a new build has started.')
713  parser.add_argument('--builder-pid',
714                      help='Builder process\'s pid for build BUILD_ID.')
715  parser.add_argument('--cancel-build',
716                      metavar='BUILD_ID',
717                      help='Cancel all pending and running tasks for BUILD_ID.')
718  args = parser.parse_args()
719  if args.fail_if_not_running:
720    return _check_if_running()
721  if args.wait_for_build:
722    return _wait_for_build(args.wait_for_build)
723  if args.register_build_id:
724    return _register_builder(args.register_build_id, args.builder_pid)
725  if args.cancel_build:
726    return _send_cancel_build(args.cancel_build)
727  return _wait_for_task_requests(args)
728
729
730if __name__ == '__main__':
731  sys.excepthook = _exception_hook
732  sys.exit(main())
733