1#!/usr/bin/env python3 2# Copyright 2021 The Chromium Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5"""Creates an server to offload non-critical-path GN targets.""" 6 7from __future__ import annotations 8 9import argparse 10import collections 11import contextlib 12import dataclasses 13import datetime 14import os 15import pathlib 16import re 17import signal 18import shlex 19import shutil 20import socket 21import subprocess 22import sys 23import threading 24import traceback 25import time 26from typing import Callable, Dict, List, Optional, Tuple, IO 27 28sys.path.append(os.path.join(os.path.dirname(__file__), 'gyp')) 29from util import server_utils 30 31_SOCKET_TIMEOUT = 60 # seconds 32 33_LOGFILE_NAME = 'buildserver.log' 34_MAX_LOGFILES = 6 35 36FIRST_LOG_LINE = """\ 37#### Start of log for build: {build_id} 38#### CWD: {outdir} 39""" 40BUILD_ID_RE = re.compile(r'^#### Start of log for build: (?P<build_id>.+)') 41 42 43def server_log(msg: str): 44 if OptionsManager.is_quiet(): 45 return 46 # Ensure we start our message on a new line. 47 print('\n' + msg) 48 49 50def print_status(prefix: str, msg: str): 51 # No need to also output to the terminal if quiet. 52 if OptionsManager.is_quiet(): 53 return 54 # Shrink the message (leaving a 2-char prefix and use the rest of the room 55 # for the suffix) according to terminal size so it is always one line. 56 width = shutil.get_terminal_size().columns 57 max_msg_width = width - len(prefix) 58 if len(msg) > max_msg_width: 59 length_to_show = max_msg_width - 5 # Account for ellipsis and header. 60 msg = f'{msg[:2]}...{msg[-length_to_show:]}' 61 # \r to return the carriage to the beginning of line. 62 # \033[K to replace the normal \n to erase until the end of the line. 63 # Avoid the default line ending so the next \r overwrites the same line just 64 # like ninja's output. 65 print(f'\r{prefix}{msg}\033[K', end='', flush=True) 66 67 68def _exception_hook(exctype: type, exc: Exception, tb): 69 # Let KeyboardInterrupt through. 70 if issubclass(exctype, KeyboardInterrupt): 71 sys.__excepthook__(exctype, exc, tb) 72 return 73 stacktrace = ''.join(traceback.format_exception(exctype, exc, tb)) 74 stacktrace_lines = [f'\n⛔{line}' for line in stacktrace.splitlines()] 75 # Output uncaught exceptions to all live terminals 76 # Extra newline since siso's output often erases the current line. 77 BuildManager.broadcast(''.join(stacktrace_lines) + '\n') 78 # Cancel all pending tasks cleanly (i.e. delete stamp files if necessary). 79 TaskManager.deactivate() 80 # Reset all remote terminal titles. 81 BuildManager.update_remote_titles('') 82 83 84# Stores global options so as to not keep passing along and storing options 85# everywhere. 86class OptionsManager: 87 _options = None 88 89 @classmethod 90 def set_options(cls, options): 91 cls._options = options 92 93 @classmethod 94 def is_quiet(cls): 95 assert cls._options is not None 96 return cls._options.quiet 97 98 @classmethod 99 def should_remote_print(cls): 100 assert cls._options is not None 101 return not cls._options.no_remote_print 102 103 104class LogfileManager: 105 _logfiles: dict[str, IO[str]] = {} 106 _lock = threading.RLock() 107 108 @classmethod 109 def create_logfile(cls, build_id, outdir): 110 with cls._lock: 111 if logfile := cls._logfiles.get(build_id, None): 112 return logfile 113 114 outdir = pathlib.Path(outdir) 115 latest_logfile = outdir / f'{_LOGFILE_NAME}.0' 116 117 if latest_logfile.exists(): 118 with latest_logfile.open('rt') as f: 119 first_line = f.readline() 120 if log_build_id := BUILD_ID_RE.search(first_line): 121 # If the newest logfile on disk is referencing the same build we are 122 # currently processing, we probably crashed previously and we should 123 # pick up where we left off in the same logfile. 124 if log_build_id.group('build_id') == build_id: 125 cls._logfiles[build_id] = latest_logfile.open('at') 126 return cls._logfiles[build_id] 127 128 # Do the logfile name shift. 129 filenames = os.listdir(outdir) 130 logfiles = {f for f in filenames if f.startswith(_LOGFILE_NAME)} 131 for idx in reversed(range(_MAX_LOGFILES)): 132 current_name = f'{_LOGFILE_NAME}.{idx}' 133 next_name = f'{_LOGFILE_NAME}.{idx+1}' 134 if current_name in logfiles: 135 shutil.move(os.path.join(outdir, current_name), 136 os.path.join(outdir, next_name)) 137 138 # Create a new 0th logfile. 139 logfile = latest_logfile.open('wt') 140 logfile.write(FIRST_LOG_LINE.format(build_id=build_id, outdir=outdir)) 141 logfile.flush() 142 cls._logfiles[build_id] = logfile 143 return logfile 144 145 146class TaskStats: 147 """Class to keep track of aggregate stats for all tasks across threads.""" 148 _num_processes = 0 149 _completed_tasks = 0 150 _total_tasks = 0 151 _lock = threading.RLock() 152 153 @classmethod 154 def no_running_processes(cls): 155 with cls._lock: 156 return cls._num_processes == 0 157 158 @classmethod 159 def add_task(cls): 160 with cls._lock: 161 cls._total_tasks += 1 162 163 @classmethod 164 def add_process(cls): 165 with cls._lock: 166 cls._num_processes += 1 167 168 @classmethod 169 def remove_process(cls): 170 with cls._lock: 171 cls._num_processes -= 1 172 173 @classmethod 174 def complete_task(cls): 175 with cls._lock: 176 cls._completed_tasks += 1 177 178 @classmethod 179 def num_pending_tasks(cls): 180 with cls._lock: 181 return cls._total_tasks - cls._completed_tasks 182 183 @classmethod 184 def num_completed_tasks(cls): 185 with cls._lock: 186 return cls._completed_tasks 187 188 @classmethod 189 def total_tasks(cls): 190 with cls._lock: 191 return cls._total_tasks 192 193 @classmethod 194 def get_title_message(cls): 195 with cls._lock: 196 return f'Analysis Steps: {cls._completed_tasks}/{cls._total_tasks}' 197 198 @classmethod 199 def query_build(cls, query_build_id: str = None): 200 builds = [] 201 if query_build_id: 202 if build := BuildManager.get_build(query_build_id): 203 builds.append(build) 204 else: 205 builds = BuildManager.get_all_builds() 206 build_infos = [] 207 for build in builds: 208 build_infos.append(build.query_build_info()) 209 return { 210 'pid': os.getpid(), 211 'builds': build_infos, 212 } 213 214 @classmethod 215 def prefix(cls, build_id: str = None): 216 # Ninja's prefix is: [205 processes, 6/734 @ 6.5/s : 0.922s ] 217 # Time taken and task completion rate are not important for the build server 218 # since it is always running in the background and uses idle priority for 219 # its tasks. 220 with cls._lock: 221 if build_id: 222 build = BuildManager.get_build(build_id) 223 _num_processes = build.process_count() 224 _completed_tasks = build.completed_task_count() 225 _total_tasks = build.total_task_count() 226 else: 227 _num_processes = cls._num_processes 228 _completed_tasks = cls._completed_tasks 229 _total_tasks = cls._total_tasks 230 word = 'process' if _num_processes == 1 else 'processes' 231 return (f'{_num_processes} {word}, ' 232 f'{_completed_tasks}/{_total_tasks}') 233 234 235def check_pid_alive(pid: int): 236 try: 237 os.kill(pid, 0) 238 except OSError: 239 return False 240 return True 241 242 243@dataclasses.dataclass 244class Build: 245 id: str 246 pid: int 247 env: dict 248 stdout: IO[str] 249 cwd: Optional[str] = None 250 _logfile: Optional[IO[str]] = None 251 _is_ninja_alive: bool = True 252 _tasks: List[Task] = dataclasses.field(default_factory=list) 253 _completed_task_count = 0 254 _active_process_count = 0 255 _lock: threading.RLock = dataclasses.field(default_factory=threading.RLock, 256 repr=False, 257 init=False) 258 259 def __hash__(self): 260 return hash((self.id, self.pid, self.cwd)) 261 262 def add_task(self, task: Task): 263 self._status_update(f'QUEUED {task.name}') 264 with self._lock: 265 self._tasks.append(task) 266 TaskStats.add_task() 267 TaskManager.add_task(task) 268 269 def add_process(self, task: Task): 270 self._status_update(f'STARTING {task.name}') 271 with self._lock: 272 self._active_process_count += 1 273 TaskStats.add_process() 274 275 def task_done(self, task: Task, status_string: str): 276 self._status_update(f'{status_string} {task.name}') 277 TaskStats.complete_task() 278 TaskManager.task_done(task) 279 with self._lock: 280 self._completed_task_count += 1 281 282 # We synchronize all terminal title info rather than having it per build 283 # since if two builds are happening in the same terminal concurrently, both 284 # builds will be overriding each other's titles continuously. Usually we 285 # only have the one build anyways so it should equivalent in most cases. 286 BuildManager.update_remote_titles() 287 with self._lock: 288 if not self.is_active(): 289 self._logfile.close() 290 # Reset in case its the last build. 291 BuildManager.update_remote_titles('') 292 293 def process_complete(self): 294 with self._lock: 295 self._active_process_count -= 1 296 TaskStats.remove_process() 297 298 def ensure_logfile(self): 299 with self._lock: 300 if not self._logfile: 301 assert self.cwd is not None 302 self._logfile = LogfileManager.create_logfile(self.id, self.cwd) 303 304 def log(self, message: str): 305 with self._lock: 306 self.ensure_logfile() 307 if self._logfile.closed: 308 # BuildManager#broadcast can call log after the build is done and the 309 # log is closed. Might make sense to separate out that flow so we can 310 # raise an exception here otherwise. 311 return 312 print(message, file=self._logfile, flush=True) 313 314 def _status_update(self, status_message): 315 prefix = f'[{TaskStats.prefix(self.id)}] ' 316 self.log(f'{prefix}{status_message}') 317 print_status(prefix, status_message) 318 319 def total_task_count(self): 320 with self._lock: 321 return len(self._tasks) 322 323 def completed_task_count(self): 324 with self._lock: 325 return self._completed_task_count 326 327 def pending_task_count(self): 328 with self._lock: 329 return self.total_task_count() - self.completed_task_count() 330 331 def process_count(self): 332 with self._lock: 333 return self._active_process_count 334 335 def is_active(self): 336 if self.pending_task_count() > 0: 337 return True 338 # Ninja is not coming back to life so only check on it if last we checked it 339 # was still alive. 340 if self._is_ninja_alive: 341 self._is_ninja_alive = check_pid_alive(self.pid) 342 return self._is_ninja_alive 343 344 def query_build_info(self): 345 current_tasks = TaskManager.get_current_tasks(self.id) 346 return { 347 'build_id': self.id, 348 'is_active': self.is_active(), 349 'completed_tasks': self.completed_task_count(), 350 'pending_tasks': self.pending_task_count(), 351 'active_tasks': [t.cmd for t in current_tasks], 352 'outdir': self.cwd, 353 } 354 355 356class BuildManager: 357 _builds_by_id: dict[str, Build] = dict() 358 _cached_ttys: dict[(int, int), tuple[IO[str], bool]] = dict() 359 _lock = threading.RLock() 360 361 @classmethod 362 def register_builder(cls, env, pid, cwd): 363 build_id = env['AUTONINJA_BUILD_ID'] 364 stdout = cls.open_tty(env['AUTONINJA_STDOUT_NAME']) 365 # Tells the script not to re-delegate to build server. 366 env[server_utils.BUILD_SERVER_ENV_VARIABLE] = '1' 367 368 with cls._lock: 369 build = Build(id=build_id, 370 pid=pid, 371 cwd=cwd, 372 env=env, 373 stdout=stdout) 374 cls.maybe_init_cwd(build, cwd) 375 cls._builds_by_id[build_id] = build 376 cls.update_remote_titles() 377 378 @classmethod 379 def maybe_init_cwd(cls, build: Build, cwd: str): 380 if cwd is not None: 381 with cls._lock: 382 if build.cwd is None: 383 build.cwd = cwd 384 else: 385 assert pathlib.Path(cwd).samefile( 386 build.cwd), f'{repr(cwd)} != {repr(build.cwd)}' 387 build.ensure_logfile() 388 389 @classmethod 390 def get_build(cls, build_id): 391 with cls._lock: 392 return cls._builds_by_id.get(build_id, None) 393 394 @classmethod 395 def open_tty(cls, tty_path): 396 # Do not open the same tty multiple times. Use st_ino and st_dev to compare 397 # file descriptors. 398 tty = open(tty_path, 'at') 399 st = os.stat(tty.fileno()) 400 tty_key = (st.st_ino, st.st_dev) 401 with cls._lock: 402 # Dedupes ttys 403 if tty_key not in cls._cached_ttys: 404 # TTYs are kept open for the lifetime of the server so that broadcast 405 # messages (e.g. uncaught exceptions) can be sent to them even if they 406 # are not currently building anything. 407 cls._cached_ttys[tty_key] = (tty, tty.isatty()) 408 else: 409 tty.close() 410 return cls._cached_ttys[tty_key][0] 411 412 @classmethod 413 def get_active_builds(cls) -> List[Build]: 414 builds = cls.get_all_builds() 415 return list(build for build in builds if build.is_active()) 416 417 @classmethod 418 def get_all_builds(cls) -> List[Build]: 419 with cls._lock: 420 return list(cls._builds_by_id.values()) 421 422 @classmethod 423 def broadcast(cls, msg: str): 424 with cls._lock: 425 ttys = list(cls._cached_ttys.values()) 426 builds = list(cls._builds_by_id.values()) 427 if OptionsManager.should_remote_print(): 428 for tty, _unused in ttys: 429 try: 430 tty.write(msg + '\n') 431 tty.flush() 432 except BrokenPipeError: 433 pass 434 for build in builds: 435 build.log(msg) 436 # Write to the current terminal if we have not written to it yet. 437 st = os.stat(sys.stderr.fileno()) 438 stderr_key = (st.st_ino, st.st_dev) 439 if stderr_key not in cls._cached_ttys: 440 print(msg, file=sys.stderr) 441 442 @classmethod 443 def update_remote_titles(cls, new_title=None): 444 if new_title is None: 445 if not cls.has_active_builds() and TaskStats.num_pending_tasks() == 0: 446 # Setting an empty title causes most terminals to go back to the 447 # default title (and at least prevents the tab title from being 448 # "Analysis Steps: N/N" forevermore. 449 new_title = '' 450 else: 451 new_title = TaskStats.get_title_message() 452 453 with cls._lock: 454 ttys = list(cls._cached_ttys.values()) 455 for tty, isatty in ttys: 456 if isatty: 457 try: 458 tty.write(f'\033]2;{new_title}\007') 459 tty.flush() 460 except BrokenPipeError: 461 pass 462 463 @classmethod 464 def has_active_builds(cls): 465 return bool(cls.get_active_builds()) 466 467 468class TaskManager: 469 """Class to encapsulate a threadsafe queue and handle deactivating it.""" 470 _queue: collections.deque[Task] = collections.deque() 471 _current_tasks: set[Task] = set() 472 _deactivated = False 473 _lock = threading.RLock() 474 475 @classmethod 476 def add_task(cls, task: Task): 477 assert not cls._deactivated 478 with cls._lock: 479 cls._queue.appendleft(task) 480 cls._maybe_start_tasks() 481 482 @classmethod 483 def task_done(cls, task: Task): 484 with cls._lock: 485 cls._current_tasks.discard(task) 486 487 @classmethod 488 def get_current_tasks(cls, build_id): 489 with cls._lock: 490 return [t for t in cls._current_tasks if t.build.id == build_id] 491 492 @classmethod 493 def deactivate(cls): 494 cls._deactivated = True 495 tasks_to_terminate: list[Task] = [] 496 with cls._lock: 497 while cls._queue: 498 task = cls._queue.pop() 499 tasks_to_terminate.append(task) 500 # Cancel possibly running tasks. 501 tasks_to_terminate.extend(cls._current_tasks) 502 # Terminate outside lock since task threads need the lock to finish 503 # terminating. 504 for task in tasks_to_terminate: 505 task.terminate() 506 507 @classmethod 508 def cancel_build(cls, build_id): 509 terminated_pending_tasks: list[Task] = [] 510 terminated_current_tasks: list[Task] = [] 511 with cls._lock: 512 # Cancel pending tasks. 513 for task in cls._queue: 514 if task.build.id == build_id: 515 terminated_pending_tasks.append(task) 516 for task in terminated_pending_tasks: 517 cls._queue.remove(task) 518 # Cancel running tasks. 519 for task in cls._current_tasks: 520 if task.build.id == build_id: 521 terminated_current_tasks.append(task) 522 # Terminate tasks outside lock since task threads need the lock to finish 523 # terminating. 524 for task in terminated_pending_tasks: 525 task.terminate() 526 for task in terminated_current_tasks: 527 task.terminate() 528 529 @staticmethod 530 # pylint: disable=inconsistent-return-statements 531 def _num_running_processes(): 532 with open('/proc/stat') as f: 533 for line in f: 534 if line.startswith('procs_running'): 535 return int(line.rstrip().split()[1]) 536 assert False, 'Could not read /proc/stat' 537 538 @classmethod 539 def _maybe_start_tasks(cls): 540 if cls._deactivated: 541 return 542 # Include load avg so that a small dip in the number of currently running 543 # processes will not cause new tasks to be started while the overall load is 544 # heavy. 545 cur_load = max(cls._num_running_processes(), os.getloadavg()[0]) 546 num_started = 0 547 # Always start a task if we don't have any running, so that all tasks are 548 # eventually finished. Try starting up tasks when the overall load is light. 549 # Limit to at most 2 new tasks to prevent ramping up too fast. There is a 550 # chance where multiple threads call _maybe_start_tasks and each gets to 551 # spawn up to 2 new tasks, but since the only downside is some build tasks 552 # get worked on earlier rather than later, it is not worth mitigating. 553 while num_started < 2 and (TaskStats.no_running_processes() 554 or num_started + cur_load < os.cpu_count()): 555 with cls._lock: 556 try: 557 next_task = cls._queue.pop() 558 cls._current_tasks.add(next_task) 559 except IndexError: 560 return 561 num_started += next_task.start(cls._maybe_start_tasks) 562 563 564# TODO(wnwen): Break this into Request (encapsulating what ninja sends) and Task 565# when a Request starts to be run. This would eliminate ambiguity 566# about when and whether _proc/_thread are initialized. 567class Task: 568 """Class to represent one task and operations on it.""" 569 570 def __init__(self, name: str, build: Build, cmd: List[str], stamp_file: str): 571 self.name = name 572 self.build = build 573 self.cmd = cmd 574 self.stamp_file = stamp_file 575 self._terminated = False 576 self._replaced = False 577 self._lock = threading.RLock() 578 self._proc: Optional[subprocess.Popen] = None 579 self._thread: Optional[threading.Thread] = None 580 self._delete_stamp_thread: Optional[threading.Thread] = None 581 self._return_code: Optional[int] = None 582 583 @property 584 def key(self): 585 return (self.build.cwd, self.name) 586 587 def __hash__(self): 588 return hash((self.key, self.build.id)) 589 590 def __eq__(self, other): 591 return self.key == other.key and self.build is other.build 592 593 def start(self, on_complete_callback: Callable[[], None]) -> int: 594 """Starts the task if it has not already been terminated. 595 596 Returns the number of processes that have been started. This is called at 597 most once when the task is popped off the task queue.""" 598 with self._lock: 599 if self._terminated: 600 return 0 601 602 # Use os.nice(19) to ensure the lowest priority (idle) for these analysis 603 # tasks since we want to avoid slowing down the actual build. 604 # TODO(wnwen): Use ionice to reduce resource consumption. 605 self.build.add_process(self) 606 # This use of preexec_fn is sufficiently simple, just one os.nice call. 607 # pylint: disable=subprocess-popen-preexec-fn 608 self._proc = subprocess.Popen( 609 self.cmd, 610 stdout=subprocess.PIPE, 611 stderr=subprocess.STDOUT, 612 cwd=self.build.cwd, 613 env=self.build.env, 614 text=True, 615 preexec_fn=lambda: os.nice(19), 616 ) 617 self._thread = threading.Thread( 618 target=self._complete_when_process_finishes, 619 args=(on_complete_callback, )) 620 self._thread.start() 621 return 1 622 623 def terminate(self, replaced=False): 624 """Can be called multiple times to cancel and ignore the task's output.""" 625 with self._lock: 626 if self._terminated: 627 return 628 self._terminated = True 629 self._replaced = replaced 630 631 # It is safe to access _proc and _thread outside of _lock since they are 632 # only changed by self.start holding _lock when self._terminate is false. 633 # Since we have just set self._terminate to true inside of _lock, we know 634 # that neither _proc nor _thread will be changed from this point onwards. 635 if self._proc: 636 self._proc.terminate() 637 self._proc.wait() 638 # Ensure that self._complete is called either by the thread or by us. 639 if self._thread: 640 self._thread.join() 641 else: 642 self._complete() 643 644 def _complete_when_process_finishes(self, 645 on_complete_callback: Callable[[], None]): 646 assert self._proc 647 # We know Popen.communicate will return a str and not a byte since it is 648 # constructed with text=True. 649 stdout: str = self._proc.communicate()[0] 650 self._return_code = self._proc.returncode 651 self.build.process_complete() 652 self._complete(stdout) 653 on_complete_callback() 654 655 def _complete(self, stdout: str = ''): 656 """Update the user and ninja after the task has run or been terminated. 657 658 This method should only be run once per task. Avoid modifying the task so 659 that this method does not need locking.""" 660 661 delete_stamp = False 662 status_string = 'FINISHED' 663 if self._terminated: 664 status_string = 'TERMINATED' 665 # When tasks are replaced, avoid deleting the stamp file, context: 666 # https://issuetracker.google.com/301961827. 667 if not self._replaced: 668 delete_stamp = True 669 elif stdout or self._return_code != 0: 670 status_string = 'FAILED' 671 delete_stamp = True 672 preamble = [ 673 f'FAILED: {self.name}', 674 f'Return code: {self._return_code}', 675 'CMD: ' + shlex.join(self.cmd), 676 'STDOUT:', 677 ] 678 679 message = '\n'.join(preamble + [stdout]) 680 self.build.log(message) 681 server_log(message) 682 683 if OptionsManager.should_remote_print(): 684 # Add emoji to show that output is from the build server. 685 preamble = [f'⏩ {line}' for line in preamble] 686 remote_message = '\n'.join(preamble + [stdout]) 687 # Add a new line at start of message to clearly delineate from previous 688 # output/text already on the remote tty we are printing to. 689 self.build.stdout.write(f'\n{remote_message}') 690 self.build.stdout.flush() 691 if delete_stamp: 692 # Force siso to consider failed targets as dirty. 693 try: 694 os.unlink(os.path.join(self.build.cwd, self.stamp_file)) 695 except FileNotFoundError: 696 pass 697 self.build.task_done(self, status_string) 698 699 700def _handle_add_task(data, current_tasks: Dict[Tuple[str, str], Task]): 701 """Handle messages of type ADD_TASK.""" 702 build_id = data['build_id'] 703 build = BuildManager.get_build(build_id) 704 BuildManager.maybe_init_cwd(build, data.get('cwd')) 705 706 new_task = Task(name=data['name'], 707 cmd=data['cmd'], 708 build=build, 709 stamp_file=data['stamp_file']) 710 existing_task = current_tasks.get(new_task.key) 711 if existing_task: 712 existing_task.terminate(replaced=True) 713 current_tasks[new_task.key] = new_task 714 715 build.add_task(new_task) 716 717 718def _handle_query_build(data, connection: socket.socket): 719 """Handle messages of type QUERY_BUILD.""" 720 build_id = data['build_id'] 721 response = TaskStats.query_build(build_id) 722 try: 723 with connection: 724 server_utils.SendMessage(connection, response) 725 except BrokenPipeError: 726 # We should not die because the client died. 727 pass 728 729 730def _handle_heartbeat(connection: socket.socket): 731 """Handle messages of type POLL_HEARTBEAT.""" 732 try: 733 with connection: 734 server_utils.SendMessage(connection, { 735 'status': 'OK', 736 'pid': os.getpid(), 737 }) 738 except BrokenPipeError: 739 # We should not die because the client died. 740 pass 741 742 743def _handle_register_builder(data): 744 """Handle messages of type REGISTER_BUILDER.""" 745 env = data['env'] 746 pid = int(data['builder_pid']) 747 cwd = data['cwd'] 748 749 BuildManager.register_builder(env, pid, cwd) 750 751 752def _handle_cancel_build(data): 753 """Handle messages of type CANCEL_BUILD.""" 754 build_id = data['build_id'] 755 TaskManager.cancel_build(build_id) 756 BuildManager.update_remote_titles('') 757 758 759def _listen_for_request_data(sock: socket.socket): 760 """Helper to encapsulate getting a new message.""" 761 while True: 762 conn = sock.accept()[0] 763 message = server_utils.ReceiveMessage(conn) 764 if message: 765 yield message, conn 766 767 768def _register_cleanup_signal_handlers(): 769 original_sigint_handler = signal.getsignal(signal.SIGINT) 770 original_sigterm_handler = signal.getsignal(signal.SIGTERM) 771 772 def _cleanup(signum, frame): 773 server_log('STOPPING SERVER...') 774 # Gracefully shut down the task manager, terminating all queued tasks. 775 TaskManager.deactivate() 776 server_log('STOPPED') 777 if signum == signal.SIGINT: 778 if callable(original_sigint_handler): 779 original_sigint_handler(signum, frame) 780 else: 781 raise KeyboardInterrupt() 782 if signum == signal.SIGTERM: 783 # Sometimes sigterm handler is not a callable. 784 if callable(original_sigterm_handler): 785 original_sigterm_handler(signum, frame) 786 else: 787 sys.exit(1) 788 789 signal.signal(signal.SIGINT, _cleanup) 790 signal.signal(signal.SIGTERM, _cleanup) 791 792 793def _process_requests(sock: socket.socket, exit_on_idle: bool): 794 """Main loop for build server receiving request messages.""" 795 # Since dicts in python can contain anything, explicitly type tasks to help 796 # make static type checking more useful. 797 tasks: Dict[Tuple[str, str], Task] = {} 798 server_log( 799 'READY... Remember to set android_static_analysis="build_server" in ' 800 'args.gn files') 801 _register_cleanup_signal_handlers() 802 # pylint: disable=too-many-nested-blocks 803 while True: 804 try: 805 for data, connection in _listen_for_request_data(sock): 806 message_type = data.get('message_type', server_utils.ADD_TASK) 807 if message_type == server_utils.POLL_HEARTBEAT: 808 _handle_heartbeat(connection) 809 elif message_type == server_utils.ADD_TASK: 810 connection.close() 811 _handle_add_task(data, tasks) 812 elif message_type == server_utils.QUERY_BUILD: 813 _handle_query_build(data, connection) 814 elif message_type == server_utils.REGISTER_BUILDER: 815 connection.close() 816 _handle_register_builder(data) 817 elif message_type == server_utils.CANCEL_BUILD: 818 connection.close() 819 _handle_cancel_build(data) 820 else: 821 connection.close() 822 except TimeoutError: 823 # If we have not received a new task in a while and do not have any 824 # pending tasks or running builds, then exit. Otherwise keep waiting. 825 if (TaskStats.num_pending_tasks() == 0 826 and not BuildManager.has_active_builds() and exit_on_idle): 827 break 828 except KeyboardInterrupt: 829 break 830 BuildManager.update_remote_titles('') 831 832 833def query_build_info(build_id=None): 834 """Communicates with the main server to query build info.""" 835 return _send_message_with_response({ 836 'message_type': server_utils.QUERY_BUILD, 837 'build_id': build_id, 838 }) 839 840 841def _wait_for_build(build_id): 842 """Comunicates with the main server waiting for a build to complete.""" 843 start_time = datetime.datetime.now() 844 while True: 845 try: 846 build_info = query_build_info(build_id)['builds'][0] 847 except ConnectionRefusedError: 848 print('No server running. It likely finished all tasks.') 849 print('You can check $OUTDIR/buildserver.log.0 to be sure.') 850 return 0 851 852 pending_tasks = build_info['pending_tasks'] 853 854 if pending_tasks == 0: 855 print(f'\nAll tasks completed for build_id: {build_id}.') 856 return 0 857 858 current_time = datetime.datetime.now() 859 duration = current_time - start_time 860 print(f'\rWaiting for {pending_tasks} tasks [{str(duration)}]\033[K', 861 end='', 862 flush=True) 863 time.sleep(1) 864 865 866def _wait_for_idle(): 867 """Communicates with the main server waiting for all builds to complete.""" 868 start_time = datetime.datetime.now() 869 while True: 870 try: 871 builds = query_build_info()['builds'] 872 except ConnectionRefusedError: 873 print('No server running. It likely finished all tasks.') 874 print('You can check $OUTDIR/buildserver.log.0 to be sure.') 875 return 0 876 877 all_pending_tasks = 0 878 all_completed_tasks = 0 879 for build_info in builds: 880 pending_tasks = build_info['pending_tasks'] 881 completed_tasks = build_info['completed_tasks'] 882 active = build_info['is_active'] 883 # Ignore completed builds. 884 if active or pending_tasks: 885 all_pending_tasks += pending_tasks 886 all_completed_tasks += completed_tasks 887 total_tasks = all_pending_tasks + all_completed_tasks 888 889 if all_pending_tasks == 0: 890 print('\nServer Idle, All tasks complete.') 891 return 0 892 893 current_time = datetime.datetime.now() 894 duration = current_time - start_time 895 print( 896 f'\rWaiting for {all_pending_tasks} remaining tasks. ' 897 f'({all_completed_tasks}/{total_tasks} tasks complete) ' 898 f'[{str(duration)}]\033[K', 899 end='', 900 flush=True) 901 time.sleep(0.5) 902 903 904def _check_if_running(): 905 """Communicates with the main server to make sure its running.""" 906 with socket.socket(socket.AF_UNIX) as sock: 907 try: 908 sock.connect(server_utils.SOCKET_ADDRESS) 909 except OSError: 910 print('Build server is not running and ' 911 'android_static_analysis="build_server" is set.\nPlease run ' 912 'this command in a separate terminal:\n\n' 913 '$ build/android/fast_local_dev_server.py\n') 914 return 1 915 else: 916 return 0 917 918 919def _send_message_and_close(message_dict): 920 with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock: 921 sock.connect(server_utils.SOCKET_ADDRESS) 922 sock.settimeout(1) 923 server_utils.SendMessage(sock, message_dict) 924 925 926def _send_message_with_response(message_dict): 927 with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock: 928 sock.connect(server_utils.SOCKET_ADDRESS) 929 sock.settimeout(1) 930 server_utils.SendMessage(sock, message_dict) 931 return server_utils.ReceiveMessage(sock) 932 933 934def _send_cancel_build(build_id): 935 _send_message_and_close({ 936 'message_type': server_utils.CANCEL_BUILD, 937 'build_id': build_id, 938 }) 939 return 0 940 941 942def _register_builder(build_id, builder_pid, output_directory): 943 if output_directory is not None: 944 output_directory = str(pathlib.Path(output_directory).absolute()) 945 for _attempt in range(3): 946 try: 947 # Ensure environment variables that the server expects to be there are 948 # present. 949 server_utils.AssertEnvironmentVariables() 950 951 _send_message_and_close({ 952 'message_type': server_utils.REGISTER_BUILDER, 953 'env': dict(os.environ), 954 'builder_pid': builder_pid, 955 'cwd': output_directory, 956 }) 957 return 0 958 except OSError: 959 time.sleep(0.05) 960 print(f'Failed to register builer for build_id={build_id}.') 961 return 1 962 963 964def poll_server(retries=3): 965 """Communicates with the main server to query build info.""" 966 for _attempt in range(retries): 967 try: 968 response = _send_message_with_response( 969 {'message_type': server_utils.POLL_HEARTBEAT}) 970 if response: 971 break 972 except OSError: 973 time.sleep(0.05) 974 else: 975 return None 976 return response['pid'] 977 978 979def _print_build_status_all(): 980 try: 981 query_data = query_build_info(None) 982 except ConnectionRefusedError: 983 print('No server running. Consult $OUTDIR/buildserver.log.0') 984 return 0 985 builds = query_data['builds'] 986 pid = query_data['pid'] 987 all_active_tasks = [] 988 print(f'Build server (PID={pid}) has {len(builds)} registered builds') 989 for build_info in builds: 990 build_id = build_info['build_id'] 991 pending_tasks = build_info['pending_tasks'] 992 completed_tasks = build_info['completed_tasks'] 993 active_tasks = build_info['active_tasks'] 994 out_dir = build_info['outdir'] 995 active = build_info['is_active'] 996 total_tasks = pending_tasks + completed_tasks 997 all_active_tasks += active_tasks 998 if total_tasks == 0 and not active: 999 status = 'Finished without any jobs' 1000 else: 1001 if active: 1002 status = 'Siso still running' 1003 else: 1004 status = 'Siso finished' 1005 if out_dir: 1006 status += f' in {out_dir}' 1007 status += f'. Completed [{completed_tasks}/{total_tasks}].' 1008 if completed_tasks < total_tasks: 1009 status += f' {len(active_tasks)} task(s) currently executing' 1010 print(f'{build_id}: {status}') 1011 if all_active_tasks: 1012 total = len(all_active_tasks) 1013 to_show = min(4, total) 1014 print(f'Currently executing (showing {to_show} of {total}):') 1015 for cmd in sorted(all_active_tasks)[:to_show]: 1016 truncated = shlex.join(cmd) 1017 if len(truncated) > 200: 1018 truncated = truncated[:200] + '...' 1019 print(truncated) 1020 return 0 1021 1022 1023def _print_build_status(build_id): 1024 server_path = os.path.relpath(str(server_utils.SERVER_SCRIPT)) 1025 try: 1026 builds = query_build_info(build_id)['builds'] 1027 if not builds: 1028 print(f'⚠️ No build found with id ({build_id})') 1029 print('⚠️ To see the status of all builds:', 1030 shlex.join([server_path, '--print-status-all'])) 1031 return 1 1032 build_info = builds[0] 1033 except ConnectionRefusedError: 1034 print('⚠️ No server running. Consult $OUTDIR/buildserver.log.0') 1035 return 0 1036 pending_tasks = build_info['pending_tasks'] 1037 1038 # Print nothing unless there are still pending tasks 1039 if pending_tasks: 1040 is_str = 'is' if pending_tasks == 1 else 'are' 1041 job_str = 'job' if pending_tasks == 1 else 'jobs' 1042 print(f'⏩ There {is_str} still {pending_tasks} static analysis {job_str}' 1043 ' running in the background.') 1044 print('⏩ To wait for them:', shlex.join([server_path, '--wait-for-idle'])) 1045 return 0 1046 1047 1048def _wait_for_task_requests(exit_on_idle): 1049 with socket.socket(socket.AF_UNIX) as sock: 1050 sock.settimeout(_SOCKET_TIMEOUT) 1051 try: 1052 sock.bind(server_utils.SOCKET_ADDRESS) 1053 except OSError as e: 1054 # errno 98 is Address already in use 1055 if e.errno == 98: 1056 if not OptionsManager.is_quiet(): 1057 pid = poll_server() 1058 print(f'Another instance is already running (pid: {pid}).', 1059 file=sys.stderr) 1060 return 1 1061 raise 1062 sock.listen() 1063 _process_requests(sock, exit_on_idle) 1064 return 0 1065 1066 1067def main(): 1068 # pylint: disable=too-many-return-statements 1069 parser = argparse.ArgumentParser(description=__doc__) 1070 parser.add_argument( 1071 '--fail-if-not-running', 1072 action='store_true', 1073 help='Used by GN to fail fast if the build server is not running.') 1074 parser.add_argument( 1075 '--exit-on-idle', 1076 action='store_true', 1077 help='Server started on demand. Exit when all tasks run out.') 1078 parser.add_argument('--quiet', 1079 action='store_true', 1080 help='Do not output status updates.') 1081 parser.add_argument('--no-remote-print', 1082 action='store_true', 1083 help='Do not output errors to remote terminals.') 1084 parser.add_argument('--wait-for-build', 1085 metavar='BUILD_ID', 1086 help='Wait for build server to finish with all tasks ' 1087 'for BUILD_ID and output any pending messages.') 1088 parser.add_argument('--wait-for-idle', 1089 action='store_true', 1090 help='Wait for build server to finish with all ' 1091 'pending tasks.') 1092 parser.add_argument('--print-status', 1093 metavar='BUILD_ID', 1094 help='Print the current state of a build.') 1095 parser.add_argument('--print-status-all', 1096 action='store_true', 1097 help='Print the current state of all active builds.') 1098 parser.add_argument( 1099 '--register-build-id', 1100 metavar='BUILD_ID', 1101 help='Inform the build server that a new build has started.') 1102 parser.add_argument('--output-directory', 1103 help='Build directory (use with --register-build-id)') 1104 parser.add_argument('--builder-pid', 1105 help='Builder process\'s pid for build BUILD_ID.') 1106 parser.add_argument('--cancel-build', 1107 metavar='BUILD_ID', 1108 help='Cancel all pending and running tasks for BUILD_ID.') 1109 args = parser.parse_args() 1110 OptionsManager.set_options(args) 1111 1112 if args.fail_if_not_running: 1113 return _check_if_running() 1114 if args.wait_for_build: 1115 return _wait_for_build(args.wait_for_build) 1116 if args.wait_for_idle: 1117 return _wait_for_idle() 1118 if args.print_status: 1119 return _print_build_status(args.print_status) 1120 if args.print_status_all: 1121 return _print_build_status_all() 1122 if args.register_build_id: 1123 return _register_builder(args.register_build_id, args.builder_pid, 1124 args.output_directory) 1125 if args.cancel_build: 1126 return _send_cancel_build(args.cancel_build) 1127 return _wait_for_task_requests(args.exit_on_idle) 1128 1129 1130if __name__ == '__main__': 1131 sys.excepthook = _exception_hook 1132 sys.exit(main()) 1133