1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Run a group of subprocesses and then finish.""" 15 16import errno 17import logging 18import multiprocessing 19import os 20import platform 21import re 22import signal 23import subprocess 24import sys 25import tempfile 26import time 27 28# cpu cost measurement 29measure_cpu_costs = False 30 31_DEFAULT_MAX_JOBS = 16 * multiprocessing.cpu_count() 32# Maximum number of bytes of job's stdout that will be stored in the result. 33# Only last N bytes of stdout will be kept if the actual output longer. 34_MAX_RESULT_SIZE = 64 * 1024 35 36 37# NOTE: If you change this, please make sure to test reviewing the 38# github PR with http://reviewable.io, which is known to add UTF-8 39# characters to the PR description, which leak into the environment here 40# and cause failures. 41def strip_non_ascii_chars(s): 42 return ''.join(c for c in s if ord(c) < 128) 43 44 45def sanitized_environment(env): 46 sanitized = {} 47 for key, value in list(env.items()): 48 sanitized[strip_non_ascii_chars(key)] = strip_non_ascii_chars(value) 49 return sanitized 50 51 52def platform_string(): 53 if platform.system() == 'Windows': 54 return 'windows' 55 elif platform.system()[:7] == 'MSYS_NT': 56 return 'windows' 57 elif platform.system() == 'Darwin': 58 return 'mac' 59 elif platform.system() == 'Linux': 60 return 'linux' 61 else: 62 return 'posix' 63 64 65# setup a signal handler so that signal.pause registers 'something' 66# when a child finishes 67# not using futures and threading to avoid a dependency on subprocess32 68if platform_string() == 'windows': 69 pass 70else: 71 72 def alarm_handler(unused_signum, unused_frame): 73 pass 74 75 signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None) 76 signal.signal(signal.SIGALRM, alarm_handler) 77 78_SUCCESS = object() 79_FAILURE = object() 80_RUNNING = object() 81_KILLED = object() 82 83_COLORS = { 84 'red': [31, 0], 85 'green': [32, 0], 86 'yellow': [33, 0], 87 'lightgray': [37, 0], 88 'gray': [30, 1], 89 'purple': [35, 0], 90 'cyan': [36, 0] 91} 92 93_BEGINNING_OF_LINE = '\x1b[0G' 94_CLEAR_LINE = '\x1b[2K' 95 96_TAG_COLOR = { 97 'FAILED': 'red', 98 'FLAKE': 'purple', 99 'TIMEOUT_FLAKE': 'purple', 100 'WARNING': 'yellow', 101 'TIMEOUT': 'red', 102 'PASSED': 'green', 103 'START': 'gray', 104 'WAITING': 'yellow', 105 'SUCCESS': 'green', 106 'IDLE': 'gray', 107 'SKIPPED': 'cyan' 108} 109 110_FORMAT = '%(asctime)-15s %(message)s' 111logging.basicConfig(level=logging.INFO, format=_FORMAT) 112 113 114def eintr_be_gone(fn): 115 """Run fn until it doesn't stop because of EINTR""" 116 while True: 117 try: 118 return fn() 119 except IOError as e: 120 if e.errno != errno.EINTR: 121 raise 122 123 124def message(tag, msg, explanatory_text=None, do_newline=False): 125 if message.old_tag == tag and message.old_msg == msg and not explanatory_text: 126 return 127 message.old_tag = tag 128 message.old_msg = msg 129 if explanatory_text: 130 if isinstance(explanatory_text, bytes): 131 explanatory_text = explanatory_text.decode('utf8', errors='replace') 132 while True: 133 try: 134 if platform_string() == 'windows' or not sys.stdout.isatty(): 135 if explanatory_text: 136 logging.info(explanatory_text) 137 logging.info('%s: %s', tag, msg) 138 else: 139 sys.stdout.write( 140 '%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' % 141 (_BEGINNING_OF_LINE, _CLEAR_LINE, '\n%s' % 142 explanatory_text if explanatory_text is not None else '', 143 _COLORS[_TAG_COLOR[tag]][1], _COLORS[_TAG_COLOR[tag]][0], 144 tag, msg, '\n' 145 if do_newline or explanatory_text is not None else '')) 146 sys.stdout.flush() 147 return 148 except IOError as e: 149 if e.errno != errno.EINTR: 150 raise 151 152 153message.old_tag = '' 154message.old_msg = '' 155 156 157def which(filename): 158 if '/' in filename: 159 return filename 160 for path in os.environ['PATH'].split(os.pathsep): 161 if os.path.exists(os.path.join(path, filename)): 162 return os.path.join(path, filename) 163 raise Exception('%s not found' % filename) 164 165 166class JobSpec(object): 167 """Specifies what to run for a job.""" 168 169 def __init__(self, 170 cmdline, 171 shortname=None, 172 environ=None, 173 cwd=None, 174 shell=False, 175 timeout_seconds=5 * 60, 176 flake_retries=0, 177 timeout_retries=0, 178 kill_handler=None, 179 cpu_cost=1.0, 180 verbose_success=False, 181 logfilename=None): 182 """ 183 Arguments: 184 cmdline: a list of arguments to pass as the command line 185 environ: a dictionary of environment variables to set in the child process 186 kill_handler: a handler that will be called whenever job.kill() is invoked 187 cpu_cost: number of cores per second this job needs 188 logfilename: use given file to store job's output, rather than using a temporary file 189 """ 190 if environ is None: 191 environ = {} 192 self.cmdline = cmdline 193 self.environ = environ 194 self.shortname = cmdline[0] if shortname is None else shortname 195 self.cwd = cwd 196 self.shell = shell 197 self.timeout_seconds = timeout_seconds 198 self.flake_retries = flake_retries 199 self.timeout_retries = timeout_retries 200 self.kill_handler = kill_handler 201 self.cpu_cost = cpu_cost 202 self.verbose_success = verbose_success 203 self.logfilename = logfilename 204 if self.logfilename and self.flake_retries != 0 and self.timeout_retries != 0: 205 # Forbidden to avoid overwriting the test log when retrying. 206 raise Exception( 207 'Cannot use custom logfile when retries are enabled') 208 209 def identity(self): 210 return '%r %r' % (self.cmdline, self.environ) 211 212 def __hash__(self): 213 return hash(self.identity()) 214 215 def __cmp__(self, other): 216 return self.identity() == other.identity() 217 218 def __lt__(self, other): 219 return self.identity() < other.identity() 220 221 def __repr__(self): 222 return 'JobSpec(shortname=%s, cmdline=%s)' % (self.shortname, 223 self.cmdline) 224 225 def __str__(self): 226 return '%s: %s %s' % (self.shortname, ' '.join( 227 '%s=%s' % kv for kv in list(self.environ.items())), ' '.join( 228 self.cmdline)) 229 230 231class JobResult(object): 232 233 def __init__(self): 234 self.state = 'UNKNOWN' 235 self.returncode = -1 236 self.elapsed_time = 0 237 self.num_failures = 0 238 self.retries = 0 239 self.message = '' 240 self.cpu_estimated = 1 241 self.cpu_measured = 1 242 243 244def read_from_start(f): 245 f.seek(0) 246 return f.read() 247 248 249class Job(object): 250 """Manages one job.""" 251 252 def __init__(self, 253 spec, 254 newline_on_success, 255 travis, 256 add_env, 257 quiet_success=False): 258 self._spec = spec 259 self._newline_on_success = newline_on_success 260 self._travis = travis 261 self._add_env = add_env.copy() 262 self._retries = 0 263 self._timeout_retries = 0 264 self._suppress_failure_message = False 265 self._quiet_success = quiet_success 266 if not self._quiet_success: 267 message('START', spec.shortname, do_newline=self._travis) 268 self.result = JobResult() 269 self.start() 270 271 def GetSpec(self): 272 return self._spec 273 274 def start(self): 275 if self._spec.logfilename: 276 # make sure the log directory exists 277 logfile_dir = os.path.dirname( 278 os.path.abspath(self._spec.logfilename)) 279 if not os.path.exists(logfile_dir): 280 os.makedirs(logfile_dir) 281 self._logfile = open(self._spec.logfilename, 'w+') 282 else: 283 # macOS: a series of quick os.unlink invocation might cause OS 284 # error during the creation of temporary file. By using 285 # NamedTemporaryFile, we defer the removal of file and directory. 286 self._logfile = tempfile.NamedTemporaryFile() 287 env = dict(os.environ) 288 env.update(self._spec.environ) 289 env.update(self._add_env) 290 env = sanitized_environment(env) 291 self._start = time.time() 292 cmdline = self._spec.cmdline 293 # The Unix time command is finicky when used with MSBuild, so we don't use it 294 # with jobs that run MSBuild. 295 global measure_cpu_costs 296 if measure_cpu_costs and not 'vsprojects\\build' in cmdline[0]: 297 cmdline = ['time', '-p'] + cmdline 298 else: 299 measure_cpu_costs = False 300 try_start = lambda: subprocess.Popen(args=cmdline, 301 stderr=subprocess.STDOUT, 302 stdout=self._logfile, 303 cwd=self._spec.cwd, 304 shell=self._spec.shell, 305 env=env) 306 delay = 0.3 307 for i in range(0, 4): 308 try: 309 self._process = try_start() 310 break 311 except OSError: 312 message( 313 'WARNING', 'Failed to start %s, retrying in %f seconds' % 314 (self._spec.shortname, delay)) 315 time.sleep(delay) 316 delay *= 2 317 else: 318 self._process = try_start() 319 self._state = _RUNNING 320 321 def state(self): 322 """Poll current state of the job. Prints messages at completion.""" 323 324 def stdout(self=self): 325 stdout = read_from_start(self._logfile) 326 self.result.message = stdout[-_MAX_RESULT_SIZE:] 327 return stdout 328 329 if self._state == _RUNNING and self._process.poll() is not None: 330 elapsed = time.time() - self._start 331 self.result.elapsed_time = elapsed 332 if self._process.returncode != 0: 333 if self._retries < self._spec.flake_retries: 334 message('FLAKE', 335 '%s [ret=%d, pid=%d]' % 336 (self._spec.shortname, self._process.returncode, 337 self._process.pid), 338 stdout(), 339 do_newline=True) 340 self._retries += 1 341 self.result.num_failures += 1 342 self.result.retries = self._timeout_retries + self._retries 343 # NOTE: job is restarted regardless of jobset's max_time setting 344 self.start() 345 else: 346 self._state = _FAILURE 347 if not self._suppress_failure_message: 348 message('FAILED', 349 '%s [ret=%d, pid=%d, time=%.1fsec]' % 350 (self._spec.shortname, self._process.returncode, 351 self._process.pid, elapsed), 352 stdout(), 353 do_newline=True) 354 self.result.state = 'FAILED' 355 self.result.num_failures += 1 356 self.result.returncode = self._process.returncode 357 else: 358 self._state = _SUCCESS 359 measurement = '' 360 if measure_cpu_costs: 361 m = re.search( 362 r'real\s+([0-9.]+)\nuser\s+([0-9.]+)\nsys\s+([0-9.]+)', 363 (stdout()).decode('utf8', errors='replace')) 364 real = float(m.group(1)) 365 user = float(m.group(2)) 366 sys = float(m.group(3)) 367 if real > 0.5: 368 cores = (user + sys) / real 369 self.result.cpu_measured = float('%.01f' % cores) 370 self.result.cpu_estimated = float('%.01f' % 371 self._spec.cpu_cost) 372 measurement = '; cpu_cost=%.01f; estimated=%.01f' % ( 373 self.result.cpu_measured, self.result.cpu_estimated) 374 if not self._quiet_success: 375 message('PASSED', 376 '%s [time=%.1fsec, retries=%d:%d%s]' % 377 (self._spec.shortname, elapsed, self._retries, 378 self._timeout_retries, measurement), 379 stdout() if self._spec.verbose_success else None, 380 do_newline=self._newline_on_success or self._travis) 381 self.result.state = 'PASSED' 382 elif (self._state == _RUNNING and 383 self._spec.timeout_seconds is not None and 384 time.time() - self._start > self._spec.timeout_seconds): 385 elapsed = time.time() - self._start 386 self.result.elapsed_time = elapsed 387 if self._timeout_retries < self._spec.timeout_retries: 388 message('TIMEOUT_FLAKE', 389 '%s [pid=%d]' % 390 (self._spec.shortname, self._process.pid), 391 stdout(), 392 do_newline=True) 393 self._timeout_retries += 1 394 self.result.num_failures += 1 395 self.result.retries = self._timeout_retries + self._retries 396 if self._spec.kill_handler: 397 self._spec.kill_handler(self) 398 self._process.terminate() 399 # NOTE: job is restarted regardless of jobset's max_time setting 400 self.start() 401 else: 402 message('TIMEOUT', 403 '%s [pid=%d, time=%.1fsec]' % 404 (self._spec.shortname, self._process.pid, elapsed), 405 stdout(), 406 do_newline=True) 407 self.kill() 408 self.result.state = 'TIMEOUT' 409 self.result.num_failures += 1 410 return self._state 411 412 def kill(self): 413 if self._state == _RUNNING: 414 self._state = _KILLED 415 if self._spec.kill_handler: 416 self._spec.kill_handler(self) 417 self._process.terminate() 418 419 def suppress_failure_message(self): 420 self._suppress_failure_message = True 421 422 423class Jobset(object): 424 """Manages one run of jobs.""" 425 426 def __init__(self, check_cancelled, maxjobs, maxjobs_cpu_agnostic, 427 newline_on_success, travis, stop_on_failure, add_env, 428 quiet_success, max_time): 429 self._running = set() 430 self._check_cancelled = check_cancelled 431 self._cancelled = False 432 self._failures = 0 433 self._completed = 0 434 self._maxjobs = maxjobs 435 self._maxjobs_cpu_agnostic = maxjobs_cpu_agnostic 436 self._newline_on_success = newline_on_success 437 self._travis = travis 438 self._stop_on_failure = stop_on_failure 439 self._add_env = add_env 440 self._quiet_success = quiet_success 441 self._max_time = max_time 442 self.resultset = {} 443 self._remaining = None 444 self._start_time = time.time() 445 446 def set_remaining(self, remaining): 447 self._remaining = remaining 448 449 def get_num_failures(self): 450 return self._failures 451 452 def cpu_cost(self): 453 c = 0 454 for job in self._running: 455 c += job._spec.cpu_cost 456 return c 457 458 def start(self, spec): 459 """Start a job. Return True on success, False on failure.""" 460 while True: 461 if self._max_time > 0 and time.time( 462 ) - self._start_time > self._max_time: 463 skipped_job_result = JobResult() 464 skipped_job_result.state = 'SKIPPED' 465 message('SKIPPED', spec.shortname, do_newline=True) 466 self.resultset[spec.shortname] = [skipped_job_result] 467 return True 468 if self.cancelled(): 469 return False 470 current_cpu_cost = self.cpu_cost() 471 if current_cpu_cost == 0: 472 break 473 if current_cpu_cost + spec.cpu_cost <= self._maxjobs: 474 if len(self._running) < self._maxjobs_cpu_agnostic: 475 break 476 self.reap(spec.shortname, spec.cpu_cost) 477 if self.cancelled(): 478 return False 479 job = Job(spec, self._newline_on_success, self._travis, self._add_env, 480 self._quiet_success) 481 self._running.add(job) 482 if job.GetSpec().shortname not in self.resultset: 483 self.resultset[job.GetSpec().shortname] = [] 484 return True 485 486 def reap(self, waiting_for=None, waiting_for_cost=None): 487 """Collect the dead jobs.""" 488 while self._running: 489 dead = set() 490 for job in self._running: 491 st = eintr_be_gone(lambda: job.state()) 492 if st == _RUNNING: 493 continue 494 if st == _FAILURE or st == _KILLED: 495 self._failures += 1 496 if self._stop_on_failure: 497 self._cancelled = True 498 for job in self._running: 499 job.kill() 500 dead.add(job) 501 break 502 for job in dead: 503 self._completed += 1 504 if not self._quiet_success or job.result.state != 'PASSED': 505 self.resultset[job.GetSpec().shortname].append(job.result) 506 self._running.remove(job) 507 if dead: 508 return 509 if not self._travis and platform_string() != 'windows': 510 rstr = '' if self._remaining is None else '%d queued, ' % self._remaining 511 if self._remaining is not None and self._completed > 0: 512 now = time.time() 513 sofar = now - self._start_time 514 remaining = sofar / self._completed * (self._remaining + 515 len(self._running)) 516 rstr = 'ETA %.1f sec; %s' % (remaining, rstr) 517 if waiting_for is not None: 518 wstr = ' next: %s @ %.2f cpu' % (waiting_for, 519 waiting_for_cost) 520 else: 521 wstr = '' 522 message( 523 'WAITING', 524 '%s%d jobs running, %d complete, %d failed (load %.2f)%s' % 525 (rstr, len(self._running), self._completed, self._failures, 526 self.cpu_cost(), wstr)) 527 if platform_string() == 'windows': 528 time.sleep(0.1) 529 else: 530 signal.alarm(10) 531 signal.pause() 532 533 def cancelled(self): 534 """Poll for cancellation.""" 535 if self._cancelled: 536 return True 537 if not self._check_cancelled(): 538 return False 539 for job in self._running: 540 job.kill() 541 self._cancelled = True 542 return True 543 544 def finish(self): 545 while self._running: 546 if self.cancelled(): 547 pass # poll cancellation 548 self.reap() 549 if platform_string() != 'windows': 550 signal.alarm(0) 551 return not self.cancelled() and self._failures == 0 552 553 554def _never_cancelled(): 555 return False 556 557 558def tag_remaining(xs): 559 staging = [] 560 for x in xs: 561 staging.append(x) 562 if len(staging) > 5000: 563 yield (staging.pop(0), None) 564 n = len(staging) 565 for i, x in enumerate(staging): 566 yield (x, n - i - 1) 567 568 569def run(cmdlines, 570 check_cancelled=_never_cancelled, 571 maxjobs=None, 572 maxjobs_cpu_agnostic=None, 573 newline_on_success=False, 574 travis=False, 575 infinite_runs=False, 576 stop_on_failure=False, 577 add_env={}, 578 skip_jobs=False, 579 quiet_success=False, 580 max_time=-1): 581 if skip_jobs: 582 resultset = {} 583 skipped_job_result = JobResult() 584 skipped_job_result.state = 'SKIPPED' 585 for job in cmdlines: 586 message('SKIPPED', job.shortname, do_newline=True) 587 resultset[job.shortname] = [skipped_job_result] 588 return 0, resultset 589 js = Jobset( 590 check_cancelled, maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS, 591 maxjobs_cpu_agnostic if maxjobs_cpu_agnostic is not None else 592 _DEFAULT_MAX_JOBS, newline_on_success, travis, stop_on_failure, add_env, 593 quiet_success, max_time) 594 for cmdline, remaining in tag_remaining(cmdlines): 595 if not js.start(cmdline): 596 break 597 if remaining is not None: 598 js.set_remaining(remaining) 599 js.finish() 600 return js.get_num_failures(), js.resultset 601