1#!/usr/bin/env python3 2# Copyright 2021 The Chromium Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5"""Creates an server to offload non-critical-path GN targets.""" 6 7from __future__ import annotations 8 9import argparse 10import json 11import os 12import queue 13import shutil 14import socket 15import subprocess 16import sys 17import threading 18from typing import Callable, Dict, List, Optional, Tuple 19 20sys.path.append(os.path.join(os.path.dirname(__file__), 'gyp')) 21from util import server_utils 22 23 24def log(msg: str, *, end: str = ''): 25 # Shrink the message (leaving a 2-char prefix and use the rest of the room 26 # for the suffix) according to terminal size so it is always one line. 27 width = shutil.get_terminal_size().columns 28 prefix = f'[{TaskStats.prefix()}] ' 29 max_msg_width = width - len(prefix) 30 if len(msg) > max_msg_width: 31 length_to_show = max_msg_width - 5 # Account for ellipsis and header. 32 msg = f'{msg[:2]}...{msg[-length_to_show:]}' 33 # \r to return the carriage to the beginning of line. 34 # \033[K to replace the normal \n to erase until the end of the line. 35 # Avoid the default line ending so the next \r overwrites the same line just 36 # like ninja's output. 37 print(f'\r{prefix}{msg}\033[K', end=end, flush=True) 38 39 40class TaskStats: 41 """Class to keep track of aggregate stats for all tasks across threads.""" 42 _num_processes = 0 43 _completed_tasks = 0 44 _total_tasks = 0 45 _lock = threading.Lock() 46 47 @classmethod 48 def no_running_processes(cls): 49 return cls._num_processes == 0 50 51 @classmethod 52 def add_task(cls): 53 # Only the main thread calls this, so there is no need for locking. 54 cls._total_tasks += 1 55 56 @classmethod 57 def add_process(cls): 58 with cls._lock: 59 cls._num_processes += 1 60 61 @classmethod 62 def remove_process(cls): 63 with cls._lock: 64 cls._num_processes -= 1 65 66 @classmethod 67 def complete_task(cls): 68 with cls._lock: 69 cls._completed_tasks += 1 70 71 @classmethod 72 def prefix(cls): 73 # Ninja's prefix is: [205 processes, 6/734 @ 6.5/s : 0.922s ] 74 # Time taken and task completion rate are not important for the build server 75 # since it is always running in the background and uses idle priority for 76 # its tasks. 77 with cls._lock: 78 word = 'process' if cls._num_processes == 1 else 'processes' 79 return (f'{cls._num_processes} {word}, ' 80 f'{cls._completed_tasks}/{cls._total_tasks}') 81 82 83class TaskManager: 84 """Class to encapsulate a threadsafe queue and handle deactivating it.""" 85 86 def __init__(self): 87 self._queue: queue.SimpleQueue[Task] = queue.SimpleQueue() 88 self._deactivated = False 89 90 def add_task(self, task: Task): 91 assert not self._deactivated 92 TaskStats.add_task() 93 self._queue.put(task) 94 log(f'QUEUED {task.name}') 95 self._maybe_start_tasks() 96 97 def deactivate(self): 98 self._deactivated = True 99 while not self._queue.empty(): 100 try: 101 task = self._queue.get_nowait() 102 except queue.Empty: 103 return 104 task.terminate() 105 106 @staticmethod 107 def _num_running_processes(): 108 with open('/proc/stat') as f: 109 for line in f: 110 if line.startswith('procs_running'): 111 return int(line.rstrip().split()[1]) 112 assert False, 'Could not read /proc/stat' 113 return 0 114 115 def _maybe_start_tasks(self): 116 if self._deactivated: 117 return 118 # Include load avg so that a small dip in the number of currently running 119 # processes will not cause new tasks to be started while the overall load is 120 # heavy. 121 cur_load = max(self._num_running_processes(), os.getloadavg()[0]) 122 num_started = 0 123 # Always start a task if we don't have any running, so that all tasks are 124 # eventually finished. Try starting up tasks when the overall load is light. 125 # Limit to at most 2 new tasks to prevent ramping up too fast. There is a 126 # chance where multiple threads call _maybe_start_tasks and each gets to 127 # spawn up to 2 new tasks, but since the only downside is some build tasks 128 # get worked on earlier rather than later, it is not worth mitigating. 129 while num_started < 2 and (TaskStats.no_running_processes() 130 or num_started + cur_load < os.cpu_count()): 131 try: 132 next_task = self._queue.get_nowait() 133 except queue.Empty: 134 return 135 num_started += next_task.start(self._maybe_start_tasks) 136 137 138# TODO(wnwen): Break this into Request (encapsulating what ninja sends) and Task 139# when a Request starts to be run. This would eliminate ambiguity 140# about when and whether _proc/_thread are initialized. 141class Task: 142 """Class to represent one task and operations on it.""" 143 144 def __init__(self, name: str, cwd: str, cmd: List[str], stamp_file: str): 145 self.name = name 146 self.cwd = cwd 147 self.cmd = cmd 148 self.stamp_file = stamp_file 149 self._terminated = False 150 self._replaced = False 151 self._lock = threading.Lock() 152 self._proc: Optional[subprocess.Popen] = None 153 self._thread: Optional[threading.Thread] = None 154 self._return_code: Optional[int] = None 155 156 @property 157 def key(self): 158 return (self.cwd, self.name) 159 160 def start(self, on_complete_callback: Callable[[], None]) -> int: 161 """Starts the task if it has not already been terminated. 162 163 Returns the number of processes that have been started. This is called at 164 most once when the task is popped off the task queue.""" 165 166 # The environment variable forces the script to actually run in order to 167 # avoid infinite recursion. 168 env = os.environ.copy() 169 env[server_utils.BUILD_SERVER_ENV_VARIABLE] = '1' 170 171 with self._lock: 172 if self._terminated: 173 return 0 174 # Use os.nice(19) to ensure the lowest priority (idle) for these analysis 175 # tasks since we want to avoid slowing down the actual build. 176 # TODO(wnwen): Use ionice to reduce resource consumption. 177 TaskStats.add_process() 178 log(f'STARTING {self.name}') 179 # This use of preexec_fn is sufficiently simple, just one os.nice call. 180 # pylint: disable=subprocess-popen-preexec-fn 181 self._proc = subprocess.Popen( 182 self.cmd, 183 stdout=subprocess.PIPE, 184 stderr=subprocess.STDOUT, 185 cwd=self.cwd, 186 env=env, 187 text=True, 188 preexec_fn=lambda: os.nice(19), 189 ) 190 self._thread = threading.Thread( 191 target=self._complete_when_process_finishes, 192 args=(on_complete_callback, )) 193 self._thread.start() 194 return 1 195 196 def terminate(self, replaced=False): 197 """Can be called multiple times to cancel and ignore the task's output.""" 198 199 with self._lock: 200 if self._terminated: 201 return 202 self._terminated = True 203 self._replaced = replaced 204 # It is safe to access _proc and _thread outside of _lock since they are 205 # only changed by self.start holding _lock when self._terminate is false. 206 # Since we have just set self._terminate to true inside of _lock, we know 207 # that neither _proc nor _thread will be changed from this point onwards. 208 if self._proc: 209 self._proc.terminate() 210 self._proc.wait() 211 # Ensure that self._complete is called either by the thread or by us. 212 if self._thread: 213 self._thread.join() 214 else: 215 self._complete() 216 217 def _complete_when_process_finishes(self, 218 on_complete_callback: Callable[[], None]): 219 assert self._proc 220 # We know Popen.communicate will return a str and not a byte since it is 221 # constructed with text=True. 222 stdout: str = self._proc.communicate()[0] 223 self._return_code = self._proc.returncode 224 TaskStats.remove_process() 225 self._complete(stdout) 226 on_complete_callback() 227 228 def _complete(self, stdout: str = ''): 229 """Update the user and ninja after the task has run or been terminated. 230 231 This method should only be run once per task. Avoid modifying the task so 232 that this method does not need locking.""" 233 234 TaskStats.complete_task() 235 delete_stamp = False 236 if self._terminated: 237 log(f'TERMINATED {self.name}') 238 # When tasks are replaced, avoid deleting the stamp file, context: 239 # https://issuetracker.google.com/301961827. 240 if not self._replaced: 241 delete_stamp = True 242 else: 243 log(f'FINISHED {self.name}') 244 if stdout or self._return_code != 0: 245 delete_stamp = True 246 # An extra new line is needed since we want to preserve the previous 247 # _log line. Use a single print so that it is threadsafe. 248 # TODO(wnwen): Improve stdout display by parsing over it and moving the 249 # actual error to the bottom. Otherwise long command lines 250 # in the Traceback section obscure the actual error(s). 251 print('\n' + '\n'.join([ 252 f'FAILED: {self.name}', 253 f'Return code: {self._return_code}', 254 ' '.join(self.cmd), 255 stdout, 256 ])) 257 258 if delete_stamp: 259 # Force ninja to consider failed targets as dirty. 260 try: 261 os.unlink(os.path.join(self.cwd, self.stamp_file)) 262 except FileNotFoundError: 263 pass 264 else: 265 # Ninja will rebuild targets when their inputs change even if their stamp 266 # file has a later modified time. Thus we do not need to worry about the 267 # script being run by the build server updating the mtime incorrectly. 268 pass 269 270 271def _listen_for_request_data(sock: socket.socket): 272 while True: 273 conn = sock.accept()[0] 274 received = [] 275 with conn: 276 while True: 277 data = conn.recv(4096) 278 if not data: 279 break 280 received.append(data) 281 if received: 282 yield json.loads(b''.join(received)) 283 284 285def _process_requests(sock: socket.socket): 286 # Since dicts in python can contain anything, explicitly type tasks to help 287 # make static type checking more useful. 288 tasks: Dict[Tuple[str, str], Task] = {} 289 task_manager = TaskManager() 290 try: 291 log('READY... Remember to set android_static_analysis="build_server" in ' 292 'args.gn files') 293 for data in _listen_for_request_data(sock): 294 task = Task(name=data['name'], 295 cwd=data['cwd'], 296 cmd=data['cmd'], 297 stamp_file=data['stamp_file']) 298 existing_task = tasks.get(task.key) 299 if existing_task: 300 existing_task.terminate(replaced=True) 301 tasks[task.key] = task 302 task_manager.add_task(task) 303 except KeyboardInterrupt: 304 log('STOPPING SERVER...', end='\n') 305 # Gracefully shut down the task manager, terminating all queued tasks. 306 task_manager.deactivate() 307 # Terminate all currently running tasks. 308 for task in tasks.values(): 309 task.terminate() 310 log('STOPPED', end='\n') 311 312 313def main(): 314 parser = argparse.ArgumentParser(description=__doc__) 315 parser.add_argument( 316 '--fail-if-not-running', 317 action='store_true', 318 help='Used by GN to fail fast if the build server is not running.') 319 args = parser.parse_args() 320 if args.fail_if_not_running: 321 with socket.socket(socket.AF_UNIX) as sock: 322 try: 323 sock.connect(server_utils.SOCKET_ADDRESS) 324 except socket.error: 325 print('Build server is not running and ' 326 'android_static_analysis="build_server" is set.\nPlease run ' 327 'this command in a separate terminal:\n\n' 328 '$ build/android/fast_local_dev_server.py\n') 329 return 1 330 else: 331 return 0 332 with socket.socket(socket.AF_UNIX) as sock: 333 sock.bind(server_utils.SOCKET_ADDRESS) 334 sock.listen() 335 _process_requests(sock) 336 return 0 337 338 339if __name__ == '__main__': 340 sys.exit(main()) 341