1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Run a group of subprocesses and then finish.""" 15 16from __future__ import print_function 17 18import logging 19import multiprocessing 20import os 21import platform 22import re 23import signal 24import subprocess 25import sys 26import tempfile 27import time 28import errno 29 30# cpu cost measurement 31measure_cpu_costs = False 32 33_DEFAULT_MAX_JOBS = 16 * multiprocessing.cpu_count() 34# Maximum number of bytes of job's stdout that will be stored in the result. 35# Only last N bytes of stdout will be kept if the actual output longer. 36_MAX_RESULT_SIZE = 64 * 1024 37 38 39# NOTE: If you change this, please make sure to test reviewing the 40# github PR with http://reviewable.io, which is known to add UTF-8 41# characters to the PR description, which leak into the environment here 42# and cause failures. 43def strip_non_ascii_chars(s): 44 return ''.join(c for c in s if ord(c) < 128) 45 46 47def sanitized_environment(env): 48 sanitized = {} 49 for key, value in env.items(): 50 sanitized[strip_non_ascii_chars(key)] = strip_non_ascii_chars(value) 51 return sanitized 52 53 54def platform_string(): 55 if platform.system() == 'Windows': 56 return 'windows' 57 elif platform.system()[:7] == 'MSYS_NT': 58 return 'windows' 59 elif platform.system() == 'Darwin': 60 return 'mac' 61 elif platform.system() == 'Linux': 62 return 'linux' 63 else: 64 return 'posix' 65 66 67# setup a signal handler so that signal.pause registers 'something' 68# when a child finishes 69# not using futures and threading to avoid a dependency on subprocess32 70if platform_string() == 'windows': 71 pass 72else: 73 74 def alarm_handler(unused_signum, unused_frame): 75 pass 76 77 signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None) 78 signal.signal(signal.SIGALRM, alarm_handler) 79 80_SUCCESS = object() 81_FAILURE = object() 82_RUNNING = object() 83_KILLED = object() 84 85_COLORS = { 86 'red': [31, 0], 87 'green': [32, 0], 88 'yellow': [33, 0], 89 'lightgray': [37, 0], 90 'gray': [30, 1], 91 'purple': [35, 0], 92 'cyan': [36, 0] 93} 94 95_BEGINNING_OF_LINE = '\x1b[0G' 96_CLEAR_LINE = '\x1b[2K' 97 98_TAG_COLOR = { 99 'FAILED': 'red', 100 'FLAKE': 'purple', 101 'TIMEOUT_FLAKE': 'purple', 102 'WARNING': 'yellow', 103 'TIMEOUT': 'red', 104 'PASSED': 'green', 105 'START': 'gray', 106 'WAITING': 'yellow', 107 'SUCCESS': 'green', 108 'IDLE': 'gray', 109 'SKIPPED': 'cyan' 110} 111 112_FORMAT = '%(asctime)-15s %(message)s' 113logging.basicConfig(level=logging.INFO, format=_FORMAT) 114 115 116def eintr_be_gone(fn): 117 """Run fn until it doesn't stop because of EINTR""" 118 while True: 119 try: 120 return fn() 121 except IOError, e: 122 if e.errno != errno.EINTR: 123 raise 124 125 126def message(tag, msg, explanatory_text=None, do_newline=False): 127 if message.old_tag == tag and message.old_msg == msg and not explanatory_text: 128 return 129 message.old_tag = tag 130 message.old_msg = msg 131 while True: 132 try: 133 if platform_string() == 'windows' or not sys.stdout.isatty(): 134 if explanatory_text: 135 logging.info(explanatory_text) 136 logging.info('%s: %s', tag, msg) 137 else: 138 sys.stdout.write( 139 '%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' % 140 (_BEGINNING_OF_LINE, _CLEAR_LINE, '\n%s' % explanatory_text 141 if explanatory_text is not None else '', 142 _COLORS[_TAG_COLOR[tag]][1], _COLORS[_TAG_COLOR[tag]][0], 143 tag, msg, '\n' 144 if do_newline or explanatory_text is not None else '')) 145 sys.stdout.flush() 146 return 147 except IOError, e: 148 if e.errno != errno.EINTR: 149 raise 150 151 152message.old_tag = '' 153message.old_msg = '' 154 155 156def which(filename): 157 if '/' in filename: 158 return filename 159 for path in os.environ['PATH'].split(os.pathsep): 160 if os.path.exists(os.path.join(path, filename)): 161 return os.path.join(path, filename) 162 raise Exception('%s not found' % filename) 163 164 165class JobSpec(object): 166 """Specifies what to run for a job.""" 167 168 def __init__(self, 169 cmdline, 170 shortname=None, 171 environ=None, 172 cwd=None, 173 shell=False, 174 timeout_seconds=5 * 60, 175 flake_retries=0, 176 timeout_retries=0, 177 kill_handler=None, 178 cpu_cost=1.0, 179 verbose_success=False): 180 """ 181 Arguments: 182 cmdline: a list of arguments to pass as the command line 183 environ: a dictionary of environment variables to set in the child process 184 kill_handler: a handler that will be called whenever job.kill() is invoked 185 cpu_cost: number of cores per second this job needs 186 """ 187 if environ is None: 188 environ = {} 189 self.cmdline = cmdline 190 self.environ = environ 191 self.shortname = cmdline[0] if shortname is None else shortname 192 self.cwd = cwd 193 self.shell = shell 194 self.timeout_seconds = timeout_seconds 195 self.flake_retries = flake_retries 196 self.timeout_retries = timeout_retries 197 self.kill_handler = kill_handler 198 self.cpu_cost = cpu_cost 199 self.verbose_success = verbose_success 200 201 def identity(self): 202 return '%r %r' % (self.cmdline, self.environ) 203 204 def __hash__(self): 205 return hash(self.identity()) 206 207 def __cmp__(self, other): 208 return self.identity() == other.identity() 209 210 def __repr__(self): 211 return 'JobSpec(shortname=%s, cmdline=%s)' % (self.shortname, 212 self.cmdline) 213 214 def __str__(self): 215 return '%s: %s %s' % (self.shortname, ' '.join( 216 '%s=%s' % kv for kv in self.environ.items()), 217 ' '.join(self.cmdline)) 218 219 220class JobResult(object): 221 222 def __init__(self): 223 self.state = 'UNKNOWN' 224 self.returncode = -1 225 self.elapsed_time = 0 226 self.num_failures = 0 227 self.retries = 0 228 self.message = '' 229 self.cpu_estimated = 1 230 self.cpu_measured = 1 231 232 233def read_from_start(f): 234 f.seek(0) 235 return f.read() 236 237 238class Job(object): 239 """Manages one job.""" 240 241 def __init__(self, 242 spec, 243 newline_on_success, 244 travis, 245 add_env, 246 quiet_success=False): 247 self._spec = spec 248 self._newline_on_success = newline_on_success 249 self._travis = travis 250 self._add_env = add_env.copy() 251 self._retries = 0 252 self._timeout_retries = 0 253 self._suppress_failure_message = False 254 self._quiet_success = quiet_success 255 if not self._quiet_success: 256 message('START', spec.shortname, do_newline=self._travis) 257 self.result = JobResult() 258 self.start() 259 260 def GetSpec(self): 261 return self._spec 262 263 def start(self): 264 self._tempfile = tempfile.TemporaryFile() 265 env = dict(os.environ) 266 env.update(self._spec.environ) 267 env.update(self._add_env) 268 env = sanitized_environment(env) 269 self._start = time.time() 270 cmdline = self._spec.cmdline 271 # The Unix time command is finicky when used with MSBuild, so we don't use it 272 # with jobs that run MSBuild. 273 global measure_cpu_costs 274 if measure_cpu_costs and not 'vsprojects\\build' in cmdline[0]: 275 cmdline = ['time', '-p'] + cmdline 276 else: 277 measure_cpu_costs = False 278 try_start = lambda: subprocess.Popen(args=cmdline, 279 stderr=subprocess.STDOUT, 280 stdout=self._tempfile, 281 cwd=self._spec.cwd, 282 shell=self._spec.shell, 283 env=env) 284 delay = 0.3 285 for i in range(0, 4): 286 try: 287 self._process = try_start() 288 break 289 except OSError: 290 message('WARNING', 291 'Failed to start %s, retrying in %f seconds' % 292 (self._spec.shortname, delay)) 293 time.sleep(delay) 294 delay *= 2 295 else: 296 self._process = try_start() 297 self._state = _RUNNING 298 299 def state(self): 300 """Poll current state of the job. Prints messages at completion.""" 301 302 def stdout(self=self): 303 stdout = read_from_start(self._tempfile) 304 self.result.message = stdout[-_MAX_RESULT_SIZE:] 305 return stdout 306 307 if self._state == _RUNNING and self._process.poll() is not None: 308 elapsed = time.time() - self._start 309 self.result.elapsed_time = elapsed 310 if self._process.returncode != 0: 311 if self._retries < self._spec.flake_retries: 312 message( 313 'FLAKE', 314 '%s [ret=%d, pid=%d]' % 315 (self._spec.shortname, self._process.returncode, 316 self._process.pid), 317 stdout(), 318 do_newline=True) 319 self._retries += 1 320 self.result.num_failures += 1 321 self.result.retries = self._timeout_retries + self._retries 322 # NOTE: job is restarted regardless of jobset's max_time setting 323 self.start() 324 else: 325 self._state = _FAILURE 326 if not self._suppress_failure_message: 327 message( 328 'FAILED', 329 '%s [ret=%d, pid=%d, time=%.1fsec]' % 330 (self._spec.shortname, self._process.returncode, 331 self._process.pid, elapsed), 332 stdout(), 333 do_newline=True) 334 self.result.state = 'FAILED' 335 self.result.num_failures += 1 336 self.result.returncode = self._process.returncode 337 else: 338 self._state = _SUCCESS 339 measurement = '' 340 if measure_cpu_costs: 341 m = re.search( 342 r'real\s+([0-9.]+)\nuser\s+([0-9.]+)\nsys\s+([0-9.]+)', 343 stdout()) 344 real = float(m.group(1)) 345 user = float(m.group(2)) 346 sys = float(m.group(3)) 347 if real > 0.5: 348 cores = (user + sys) / real 349 self.result.cpu_measured = float('%.01f' % cores) 350 self.result.cpu_estimated = float( 351 '%.01f' % self._spec.cpu_cost) 352 measurement = '; cpu_cost=%.01f; estimated=%.01f' % ( 353 self.result.cpu_measured, self.result.cpu_estimated) 354 if not self._quiet_success: 355 message( 356 'PASSED', 357 '%s [time=%.1fsec, retries=%d:%d%s]' % 358 (self._spec.shortname, elapsed, self._retries, 359 self._timeout_retries, measurement), 360 stdout() if self._spec.verbose_success else None, 361 do_newline=self._newline_on_success or self._travis) 362 self.result.state = 'PASSED' 363 elif (self._state == _RUNNING and 364 self._spec.timeout_seconds is not None and 365 time.time() - self._start > self._spec.timeout_seconds): 366 elapsed = time.time() - self._start 367 self.result.elapsed_time = elapsed 368 if self._timeout_retries < self._spec.timeout_retries: 369 message( 370 'TIMEOUT_FLAKE', 371 '%s [pid=%d]' % (self._spec.shortname, self._process.pid), 372 stdout(), 373 do_newline=True) 374 self._timeout_retries += 1 375 self.result.num_failures += 1 376 self.result.retries = self._timeout_retries + self._retries 377 if self._spec.kill_handler: 378 self._spec.kill_handler(self) 379 self._process.terminate() 380 # NOTE: job is restarted regardless of jobset's max_time setting 381 self.start() 382 else: 383 message( 384 'TIMEOUT', 385 '%s [pid=%d, time=%.1fsec]' % (self._spec.shortname, 386 self._process.pid, elapsed), 387 stdout(), 388 do_newline=True) 389 self.kill() 390 self.result.state = 'TIMEOUT' 391 self.result.num_failures += 1 392 return self._state 393 394 def kill(self): 395 if self._state == _RUNNING: 396 self._state = _KILLED 397 if self._spec.kill_handler: 398 self._spec.kill_handler(self) 399 self._process.terminate() 400 401 def suppress_failure_message(self): 402 self._suppress_failure_message = True 403 404 405class Jobset(object): 406 """Manages one run of jobs.""" 407 408 def __init__(self, check_cancelled, maxjobs, maxjobs_cpu_agnostic, 409 newline_on_success, travis, stop_on_failure, add_env, 410 quiet_success, max_time): 411 self._running = set() 412 self._check_cancelled = check_cancelled 413 self._cancelled = False 414 self._failures = 0 415 self._completed = 0 416 self._maxjobs = maxjobs 417 self._maxjobs_cpu_agnostic = maxjobs_cpu_agnostic 418 self._newline_on_success = newline_on_success 419 self._travis = travis 420 self._stop_on_failure = stop_on_failure 421 self._add_env = add_env 422 self._quiet_success = quiet_success 423 self._max_time = max_time 424 self.resultset = {} 425 self._remaining = None 426 self._start_time = time.time() 427 428 def set_remaining(self, remaining): 429 self._remaining = remaining 430 431 def get_num_failures(self): 432 return self._failures 433 434 def cpu_cost(self): 435 c = 0 436 for job in self._running: 437 c += job._spec.cpu_cost 438 return c 439 440 def start(self, spec): 441 """Start a job. Return True on success, False on failure.""" 442 while True: 443 if self._max_time > 0 and time.time( 444 ) - self._start_time > self._max_time: 445 skipped_job_result = JobResult() 446 skipped_job_result.state = 'SKIPPED' 447 message('SKIPPED', spec.shortname, do_newline=True) 448 self.resultset[spec.shortname] = [skipped_job_result] 449 return True 450 if self.cancelled(): return False 451 current_cpu_cost = self.cpu_cost() 452 if current_cpu_cost == 0: break 453 if current_cpu_cost + spec.cpu_cost <= self._maxjobs: 454 if len(self._running) < self._maxjobs_cpu_agnostic: 455 break 456 self.reap(spec.shortname, spec.cpu_cost) 457 if self.cancelled(): return False 458 job = Job(spec, self._newline_on_success, self._travis, self._add_env, 459 self._quiet_success) 460 self._running.add(job) 461 if job.GetSpec().shortname not in self.resultset: 462 self.resultset[job.GetSpec().shortname] = [] 463 return True 464 465 def reap(self, waiting_for=None, waiting_for_cost=None): 466 """Collect the dead jobs.""" 467 while self._running: 468 dead = set() 469 for job in self._running: 470 st = eintr_be_gone(lambda: job.state()) 471 if st == _RUNNING: continue 472 if st == _FAILURE or st == _KILLED: 473 self._failures += 1 474 if self._stop_on_failure: 475 self._cancelled = True 476 for job in self._running: 477 job.kill() 478 dead.add(job) 479 break 480 for job in dead: 481 self._completed += 1 482 if not self._quiet_success or job.result.state != 'PASSED': 483 self.resultset[job.GetSpec().shortname].append(job.result) 484 self._running.remove(job) 485 if dead: return 486 if not self._travis and platform_string() != 'windows': 487 rstr = '' if self._remaining is None else '%d queued, ' % self._remaining 488 if self._remaining is not None and self._completed > 0: 489 now = time.time() 490 sofar = now - self._start_time 491 remaining = sofar / self._completed * ( 492 self._remaining + len(self._running)) 493 rstr = 'ETA %.1f sec; %s' % (remaining, rstr) 494 if waiting_for is not None: 495 wstr = ' next: %s @ %.2f cpu' % (waiting_for, 496 waiting_for_cost) 497 else: 498 wstr = '' 499 message( 500 'WAITING', 501 '%s%d jobs running, %d complete, %d failed (load %.2f)%s' % 502 (rstr, len(self._running), self._completed, self._failures, 503 self.cpu_cost(), wstr)) 504 if platform_string() == 'windows': 505 time.sleep(0.1) 506 else: 507 signal.alarm(10) 508 signal.pause() 509 510 def cancelled(self): 511 """Poll for cancellation.""" 512 if self._cancelled: return True 513 if not self._check_cancelled(): return False 514 for job in self._running: 515 job.kill() 516 self._cancelled = True 517 return True 518 519 def finish(self): 520 while self._running: 521 if self.cancelled(): pass # poll cancellation 522 self.reap() 523 if platform_string() != 'windows': 524 signal.alarm(0) 525 return not self.cancelled() and self._failures == 0 526 527 528def _never_cancelled(): 529 return False 530 531 532def tag_remaining(xs): 533 staging = [] 534 for x in xs: 535 staging.append(x) 536 if len(staging) > 5000: 537 yield (staging.pop(0), None) 538 n = len(staging) 539 for i, x in enumerate(staging): 540 yield (x, n - i - 1) 541 542 543def run(cmdlines, 544 check_cancelled=_never_cancelled, 545 maxjobs=None, 546 maxjobs_cpu_agnostic=None, 547 newline_on_success=False, 548 travis=False, 549 infinite_runs=False, 550 stop_on_failure=False, 551 add_env={}, 552 skip_jobs=False, 553 quiet_success=False, 554 max_time=-1): 555 if skip_jobs: 556 resultset = {} 557 skipped_job_result = JobResult() 558 skipped_job_result.state = 'SKIPPED' 559 for job in cmdlines: 560 message('SKIPPED', job.shortname, do_newline=True) 561 resultset[job.shortname] = [skipped_job_result] 562 return 0, resultset 563 js = Jobset(check_cancelled, maxjobs if maxjobs is not None else 564 _DEFAULT_MAX_JOBS, maxjobs_cpu_agnostic 565 if maxjobs_cpu_agnostic is not None else _DEFAULT_MAX_JOBS, 566 newline_on_success, travis, stop_on_failure, add_env, 567 quiet_success, max_time) 568 for cmdline, remaining in tag_remaining(cmdlines): 569 if not js.start(cmdline): 570 break 571 if remaining is not None: 572 js.set_remaining(remaining) 573 js.finish() 574 return js.get_num_failures(), js.resultset 575