1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Run a group of subprocesses and then finish.""" 15 16import logging 17import multiprocessing 18import os 19import platform 20import re 21import signal 22import subprocess 23import sys 24import tempfile 25import time 26import errno 27 28# cpu cost measurement 29measure_cpu_costs = False 30 31_DEFAULT_MAX_JOBS = 16 * multiprocessing.cpu_count() 32# Maximum number of bytes of job's stdout that will be stored in the result. 33# Only last N bytes of stdout will be kept if the actual output longer. 34_MAX_RESULT_SIZE = 64 * 1024 35 36 37# NOTE: If you change this, please make sure to test reviewing the 38# github PR with http://reviewable.io, which is known to add UTF-8 39# characters to the PR description, which leak into the environment here 40# and cause failures. 41def strip_non_ascii_chars(s): 42 return ''.join(c for c in s if ord(c) < 128) 43 44 45def sanitized_environment(env): 46 sanitized = {} 47 for key, value in env.items(): 48 sanitized[strip_non_ascii_chars(key)] = strip_non_ascii_chars(value) 49 return sanitized 50 51 52def platform_string(): 53 if platform.system() == 'Windows': 54 return 'windows' 55 elif platform.system()[:7] == 'MSYS_NT': 56 return 'windows' 57 elif platform.system() == 'Darwin': 58 return 'mac' 59 elif platform.system() == 'Linux': 60 return 'linux' 61 else: 62 return 'posix' 63 64 65# setup a signal handler so that signal.pause registers 'something' 66# when a child finishes 67# not using futures and threading to avoid a dependency on subprocess32 68if platform_string() == 'windows': 69 pass 70else: 71 72 def alarm_handler(unused_signum, unused_frame): 73 pass 74 75 signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None) 76 signal.signal(signal.SIGALRM, alarm_handler) 77 78_SUCCESS = object() 79_FAILURE = object() 80_RUNNING = object() 81_KILLED = object() 82 83_COLORS = { 84 'red': [31, 0], 85 'green': [32, 0], 86 'yellow': [33, 0], 87 'lightgray': [37, 0], 88 'gray': [30, 1], 89 'purple': [35, 0], 90 'cyan': [36, 0] 91} 92 93_BEGINNING_OF_LINE = '\x1b[0G' 94_CLEAR_LINE = '\x1b[2K' 95 96_TAG_COLOR = { 97 'FAILED': 'red', 98 'FLAKE': 'purple', 99 'TIMEOUT_FLAKE': 'purple', 100 'WARNING': 'yellow', 101 'TIMEOUT': 'red', 102 'PASSED': 'green', 103 'START': 'gray', 104 'WAITING': 'yellow', 105 'SUCCESS': 'green', 106 'IDLE': 'gray', 107 'SKIPPED': 'cyan' 108} 109 110_FORMAT = '%(asctime)-15s %(message)s' 111logging.basicConfig(level=logging.INFO, format=_FORMAT) 112 113 114def eintr_be_gone(fn): 115 """Run fn until it doesn't stop because of EINTR""" 116 while True: 117 try: 118 return fn() 119 except IOError as e: 120 if e.errno != errno.EINTR: 121 raise 122 123 124def message(tag, msg, explanatory_text=None, do_newline=False): 125 if message.old_tag == tag and message.old_msg == msg and not explanatory_text: 126 return 127 message.old_tag = tag 128 message.old_msg = msg 129 while True: 130 try: 131 if platform_string() == 'windows' or not sys.stdout.isatty(): 132 if explanatory_text: 133 logging.info(explanatory_text) 134 logging.info('%s: %s', tag, msg) 135 else: 136 sys.stdout.write( 137 '%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' % 138 (_BEGINNING_OF_LINE, _CLEAR_LINE, '\n%s' % 139 explanatory_text if explanatory_text is not None else '', 140 _COLORS[_TAG_COLOR[tag]][1], _COLORS[_TAG_COLOR[tag]][0], 141 tag, msg, '\n' 142 if do_newline or explanatory_text is not None else '')) 143 sys.stdout.flush() 144 return 145 except IOError as e: 146 if e.errno != errno.EINTR: 147 raise 148 149 150message.old_tag = '' 151message.old_msg = '' 152 153 154def which(filename): 155 if '/' in filename: 156 return filename 157 for path in os.environ['PATH'].split(os.pathsep): 158 if os.path.exists(os.path.join(path, filename)): 159 return os.path.join(path, filename) 160 raise Exception('%s not found' % filename) 161 162 163class JobSpec(object): 164 """Specifies what to run for a job.""" 165 166 def __init__(self, 167 cmdline, 168 shortname=None, 169 environ=None, 170 cwd=None, 171 shell=False, 172 timeout_seconds=5 * 60, 173 flake_retries=0, 174 timeout_retries=0, 175 kill_handler=None, 176 cpu_cost=1.0, 177 verbose_success=False, 178 logfilename=None): 179 """ 180 Arguments: 181 cmdline: a list of arguments to pass as the command line 182 environ: a dictionary of environment variables to set in the child process 183 kill_handler: a handler that will be called whenever job.kill() is invoked 184 cpu_cost: number of cores per second this job needs 185 logfilename: use given file to store job's output, rather than using a temporary file 186 """ 187 if environ is None: 188 environ = {} 189 self.cmdline = cmdline 190 self.environ = environ 191 self.shortname = cmdline[0] if shortname is None else shortname 192 self.cwd = cwd 193 self.shell = shell 194 self.timeout_seconds = timeout_seconds 195 self.flake_retries = flake_retries 196 self.timeout_retries = timeout_retries 197 self.kill_handler = kill_handler 198 self.cpu_cost = cpu_cost 199 self.verbose_success = verbose_success 200 self.logfilename = logfilename 201 if self.logfilename and self.flake_retries != 0 and self.timeout_retries != 0: 202 # Forbidden to avoid overwriting the test log when retrying. 203 raise Exception( 204 'Cannot use custom logfile when retries are enabled') 205 206 def identity(self): 207 return '%r %r' % (self.cmdline, self.environ) 208 209 def __hash__(self): 210 return hash(self.identity()) 211 212 def __cmp__(self, other): 213 return self.identity() == other.identity() 214 215 def __repr__(self): 216 return 'JobSpec(shortname=%s, cmdline=%s)' % (self.shortname, 217 self.cmdline) 218 219 def __str__(self): 220 return '%s: %s %s' % (self.shortname, ' '.join( 221 '%s=%s' % kv for kv in self.environ.items()), ' '.join( 222 self.cmdline)) 223 224 225class JobResult(object): 226 227 def __init__(self): 228 self.state = 'UNKNOWN' 229 self.returncode = -1 230 self.elapsed_time = 0 231 self.num_failures = 0 232 self.retries = 0 233 self.message = '' 234 self.cpu_estimated = 1 235 self.cpu_measured = 1 236 237 238def read_from_start(f): 239 f.seek(0) 240 return f.read() 241 242 243class Job(object): 244 """Manages one job.""" 245 246 def __init__(self, 247 spec, 248 newline_on_success, 249 travis, 250 add_env, 251 quiet_success=False): 252 self._spec = spec 253 self._newline_on_success = newline_on_success 254 self._travis = travis 255 self._add_env = add_env.copy() 256 self._retries = 0 257 self._timeout_retries = 0 258 self._suppress_failure_message = False 259 self._quiet_success = quiet_success 260 if not self._quiet_success: 261 message('START', spec.shortname, do_newline=self._travis) 262 self.result = JobResult() 263 self.start() 264 265 def GetSpec(self): 266 return self._spec 267 268 def start(self): 269 if self._spec.logfilename: 270 # make sure the log directory exists 271 logfile_dir = os.path.dirname( 272 os.path.abspath(self._spec.logfilename)) 273 if not os.path.exists(logfile_dir): 274 os.makedirs(logfile_dir) 275 self._logfile = open(self._spec.logfilename, 'w+') 276 else: 277 self._logfile = tempfile.TemporaryFile() 278 env = dict(os.environ) 279 env.update(self._spec.environ) 280 env.update(self._add_env) 281 env = sanitized_environment(env) 282 self._start = time.time() 283 cmdline = self._spec.cmdline 284 # The Unix time command is finicky when used with MSBuild, so we don't use it 285 # with jobs that run MSBuild. 286 global measure_cpu_costs 287 if measure_cpu_costs and not 'vsprojects\\build' in cmdline[0]: 288 cmdline = ['time', '-p'] + cmdline 289 else: 290 measure_cpu_costs = False 291 try_start = lambda: subprocess.Popen(args=cmdline, 292 stderr=subprocess.STDOUT, 293 stdout=self._logfile, 294 cwd=self._spec.cwd, 295 shell=self._spec.shell, 296 env=env) 297 delay = 0.3 298 for i in range(0, 4): 299 try: 300 self._process = try_start() 301 break 302 except OSError: 303 message( 304 'WARNING', 'Failed to start %s, retrying in %f seconds' % 305 (self._spec.shortname, delay)) 306 time.sleep(delay) 307 delay *= 2 308 else: 309 self._process = try_start() 310 self._state = _RUNNING 311 312 def state(self): 313 """Poll current state of the job. Prints messages at completion.""" 314 315 def stdout(self=self): 316 stdout = read_from_start(self._logfile) 317 self.result.message = stdout[-_MAX_RESULT_SIZE:] 318 return stdout 319 320 if self._state == _RUNNING and self._process.poll() is not None: 321 elapsed = time.time() - self._start 322 self.result.elapsed_time = elapsed 323 if self._process.returncode != 0: 324 if self._retries < self._spec.flake_retries: 325 message('FLAKE', 326 '%s [ret=%d, pid=%d]' % 327 (self._spec.shortname, self._process.returncode, 328 self._process.pid), 329 stdout(), 330 do_newline=True) 331 self._retries += 1 332 self.result.num_failures += 1 333 self.result.retries = self._timeout_retries + self._retries 334 # NOTE: job is restarted regardless of jobset's max_time setting 335 self.start() 336 else: 337 self._state = _FAILURE 338 if not self._suppress_failure_message: 339 message('FAILED', 340 '%s [ret=%d, pid=%d, time=%.1fsec]' % 341 (self._spec.shortname, self._process.returncode, 342 self._process.pid, elapsed), 343 stdout(), 344 do_newline=True) 345 self.result.state = 'FAILED' 346 self.result.num_failures += 1 347 self.result.returncode = self._process.returncode 348 else: 349 self._state = _SUCCESS 350 measurement = '' 351 if measure_cpu_costs: 352 m = re.search( 353 r'real\s+([0-9.]+)\nuser\s+([0-9.]+)\nsys\s+([0-9.]+)', 354 stdout()) 355 real = float(m.group(1)) 356 user = float(m.group(2)) 357 sys = float(m.group(3)) 358 if real > 0.5: 359 cores = (user + sys) / real 360 self.result.cpu_measured = float('%.01f' % cores) 361 self.result.cpu_estimated = float('%.01f' % 362 self._spec.cpu_cost) 363 measurement = '; cpu_cost=%.01f; estimated=%.01f' % ( 364 self.result.cpu_measured, self.result.cpu_estimated) 365 if not self._quiet_success: 366 message('PASSED', 367 '%s [time=%.1fsec, retries=%d:%d%s]' % 368 (self._spec.shortname, elapsed, self._retries, 369 self._timeout_retries, measurement), 370 stdout() if self._spec.verbose_success else None, 371 do_newline=self._newline_on_success or self._travis) 372 self.result.state = 'PASSED' 373 elif (self._state == _RUNNING and 374 self._spec.timeout_seconds is not None and 375 time.time() - self._start > self._spec.timeout_seconds): 376 elapsed = time.time() - self._start 377 self.result.elapsed_time = elapsed 378 if self._timeout_retries < self._spec.timeout_retries: 379 message('TIMEOUT_FLAKE', 380 '%s [pid=%d]' % 381 (self._spec.shortname, self._process.pid), 382 stdout(), 383 do_newline=True) 384 self._timeout_retries += 1 385 self.result.num_failures += 1 386 self.result.retries = self._timeout_retries + self._retries 387 if self._spec.kill_handler: 388 self._spec.kill_handler(self) 389 self._process.terminate() 390 # NOTE: job is restarted regardless of jobset's max_time setting 391 self.start() 392 else: 393 message('TIMEOUT', 394 '%s [pid=%d, time=%.1fsec]' % 395 (self._spec.shortname, self._process.pid, elapsed), 396 stdout(), 397 do_newline=True) 398 self.kill() 399 self.result.state = 'TIMEOUT' 400 self.result.num_failures += 1 401 return self._state 402 403 def kill(self): 404 if self._state == _RUNNING: 405 self._state = _KILLED 406 if self._spec.kill_handler: 407 self._spec.kill_handler(self) 408 self._process.terminate() 409 410 def suppress_failure_message(self): 411 self._suppress_failure_message = True 412 413 414class Jobset(object): 415 """Manages one run of jobs.""" 416 417 def __init__(self, check_cancelled, maxjobs, maxjobs_cpu_agnostic, 418 newline_on_success, travis, stop_on_failure, add_env, 419 quiet_success, max_time): 420 self._running = set() 421 self._check_cancelled = check_cancelled 422 self._cancelled = False 423 self._failures = 0 424 self._completed = 0 425 self._maxjobs = maxjobs 426 self._maxjobs_cpu_agnostic = maxjobs_cpu_agnostic 427 self._newline_on_success = newline_on_success 428 self._travis = travis 429 self._stop_on_failure = stop_on_failure 430 self._add_env = add_env 431 self._quiet_success = quiet_success 432 self._max_time = max_time 433 self.resultset = {} 434 self._remaining = None 435 self._start_time = time.time() 436 437 def set_remaining(self, remaining): 438 self._remaining = remaining 439 440 def get_num_failures(self): 441 return self._failures 442 443 def cpu_cost(self): 444 c = 0 445 for job in self._running: 446 c += job._spec.cpu_cost 447 return c 448 449 def start(self, spec): 450 """Start a job. Return True on success, False on failure.""" 451 while True: 452 if self._max_time > 0 and time.time( 453 ) - self._start_time > self._max_time: 454 skipped_job_result = JobResult() 455 skipped_job_result.state = 'SKIPPED' 456 message('SKIPPED', spec.shortname, do_newline=True) 457 self.resultset[spec.shortname] = [skipped_job_result] 458 return True 459 if self.cancelled(): return False 460 current_cpu_cost = self.cpu_cost() 461 if current_cpu_cost == 0: break 462 if current_cpu_cost + spec.cpu_cost <= self._maxjobs: 463 if len(self._running) < self._maxjobs_cpu_agnostic: 464 break 465 self.reap(spec.shortname, spec.cpu_cost) 466 if self.cancelled(): return False 467 job = Job(spec, self._newline_on_success, self._travis, self._add_env, 468 self._quiet_success) 469 self._running.add(job) 470 if job.GetSpec().shortname not in self.resultset: 471 self.resultset[job.GetSpec().shortname] = [] 472 return True 473 474 def reap(self, waiting_for=None, waiting_for_cost=None): 475 """Collect the dead jobs.""" 476 while self._running: 477 dead = set() 478 for job in self._running: 479 st = eintr_be_gone(lambda: job.state()) 480 if st == _RUNNING: continue 481 if st == _FAILURE or st == _KILLED: 482 self._failures += 1 483 if self._stop_on_failure: 484 self._cancelled = True 485 for job in self._running: 486 job.kill() 487 dead.add(job) 488 break 489 for job in dead: 490 self._completed += 1 491 if not self._quiet_success or job.result.state != 'PASSED': 492 self.resultset[job.GetSpec().shortname].append(job.result) 493 self._running.remove(job) 494 if dead: return 495 if not self._travis and platform_string() != 'windows': 496 rstr = '' if self._remaining is None else '%d queued, ' % self._remaining 497 if self._remaining is not None and self._completed > 0: 498 now = time.time() 499 sofar = now - self._start_time 500 remaining = sofar / self._completed * (self._remaining + 501 len(self._running)) 502 rstr = 'ETA %.1f sec; %s' % (remaining, rstr) 503 if waiting_for is not None: 504 wstr = ' next: %s @ %.2f cpu' % (waiting_for, 505 waiting_for_cost) 506 else: 507 wstr = '' 508 message( 509 'WAITING', 510 '%s%d jobs running, %d complete, %d failed (load %.2f)%s' % 511 (rstr, len(self._running), self._completed, self._failures, 512 self.cpu_cost(), wstr)) 513 if platform_string() == 'windows': 514 time.sleep(0.1) 515 else: 516 signal.alarm(10) 517 signal.pause() 518 519 def cancelled(self): 520 """Poll for cancellation.""" 521 if self._cancelled: return True 522 if not self._check_cancelled(): return False 523 for job in self._running: 524 job.kill() 525 self._cancelled = True 526 return True 527 528 def finish(self): 529 while self._running: 530 if self.cancelled(): pass # poll cancellation 531 self.reap() 532 if platform_string() != 'windows': 533 signal.alarm(0) 534 return not self.cancelled() and self._failures == 0 535 536 537def _never_cancelled(): 538 return False 539 540 541def tag_remaining(xs): 542 staging = [] 543 for x in xs: 544 staging.append(x) 545 if len(staging) > 5000: 546 yield (staging.pop(0), None) 547 n = len(staging) 548 for i, x in enumerate(staging): 549 yield (x, n - i - 1) 550 551 552def run(cmdlines, 553 check_cancelled=_never_cancelled, 554 maxjobs=None, 555 maxjobs_cpu_agnostic=None, 556 newline_on_success=False, 557 travis=False, 558 infinite_runs=False, 559 stop_on_failure=False, 560 add_env={}, 561 skip_jobs=False, 562 quiet_success=False, 563 max_time=-1): 564 if skip_jobs: 565 resultset = {} 566 skipped_job_result = JobResult() 567 skipped_job_result.state = 'SKIPPED' 568 for job in cmdlines: 569 message('SKIPPED', job.shortname, do_newline=True) 570 resultset[job.shortname] = [skipped_job_result] 571 return 0, resultset 572 js = Jobset( 573 check_cancelled, maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS, 574 maxjobs_cpu_agnostic if maxjobs_cpu_agnostic is not None else 575 _DEFAULT_MAX_JOBS, newline_on_success, travis, stop_on_failure, add_env, 576 quiet_success, max_time) 577 for cmdline, remaining in tag_remaining(cmdlines): 578 if not js.start(cmdline): 579 break 580 if remaining is not None: 581 js.set_remaining(remaining) 582 js.finish() 583 return js.get_num_failures(), js.resultset 584