• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Run a group of subprocesses and then finish."""
15
16import errno
17import logging
18import multiprocessing
19import os
20import platform
21import re
22import signal
23import subprocess
24import sys
25import tempfile
26import time
27
28# cpu cost measurement
29measure_cpu_costs = False
30
31_DEFAULT_MAX_JOBS = 16 * multiprocessing.cpu_count()
32# Maximum number of bytes of job's stdout that will be stored in the result.
33# Only last N bytes of stdout will be kept if the actual output longer.
34_MAX_RESULT_SIZE = 64 * 1024
35
36
37# NOTE: If you change this, please make sure to test reviewing the
38# github PR with http://reviewable.io, which is known to add UTF-8
39# characters to the PR description, which leak into the environment here
40# and cause failures.
41def strip_non_ascii_chars(s):
42    return ''.join(c for c in s if ord(c) < 128)
43
44
45def sanitized_environment(env):
46    sanitized = {}
47    for key, value in list(env.items()):
48        sanitized[strip_non_ascii_chars(key)] = strip_non_ascii_chars(value)
49    return sanitized
50
51
52def platform_string():
53    if platform.system() == 'Windows':
54        return 'windows'
55    elif platform.system()[:7] == 'MSYS_NT':
56        return 'windows'
57    elif platform.system() == 'Darwin':
58        return 'mac'
59    elif platform.system() == 'Linux':
60        return 'linux'
61    else:
62        return 'posix'
63
64
65# setup a signal handler so that signal.pause registers 'something'
66# when a child finishes
67# not using futures and threading to avoid a dependency on subprocess32
68if platform_string() == 'windows':
69    pass
70else:
71
72    def alarm_handler(unused_signum, unused_frame):
73        pass
74
75    signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None)
76    signal.signal(signal.SIGALRM, alarm_handler)
77
78_SUCCESS = object()
79_FAILURE = object()
80_RUNNING = object()
81_KILLED = object()
82
83_COLORS = {
84    'red': [31, 0],
85    'green': [32, 0],
86    'yellow': [33, 0],
87    'lightgray': [37, 0],
88    'gray': [30, 1],
89    'purple': [35, 0],
90    'cyan': [36, 0]
91}
92
93_BEGINNING_OF_LINE = '\x1b[0G'
94_CLEAR_LINE = '\x1b[2K'
95
96_TAG_COLOR = {
97    'FAILED': 'red',
98    'FLAKE': 'purple',
99    'TIMEOUT_FLAKE': 'purple',
100    'WARNING': 'yellow',
101    'TIMEOUT': 'red',
102    'PASSED': 'green',
103    'START': 'gray',
104    'WAITING': 'yellow',
105    'SUCCESS': 'green',
106    'IDLE': 'gray',
107    'SKIPPED': 'cyan'
108}
109
110_FORMAT = '%(asctime)-15s %(message)s'
111logging.basicConfig(level=logging.INFO, format=_FORMAT)
112
113
114def eintr_be_gone(fn):
115    """Run fn until it doesn't stop because of EINTR"""
116    while True:
117        try:
118            return fn()
119        except IOError as e:
120            if e.errno != errno.EINTR:
121                raise
122
123
124def message(tag, msg, explanatory_text=None, do_newline=False):
125    if message.old_tag == tag and message.old_msg == msg and not explanatory_text:
126        return
127    message.old_tag = tag
128    message.old_msg = msg
129    if explanatory_text:
130        if isinstance(explanatory_text, bytes):
131            explanatory_text = explanatory_text.decode('utf8', errors='replace')
132    while True:
133        try:
134            if platform_string() == 'windows' or not sys.stdout.isatty():
135                if explanatory_text:
136                    logging.info(explanatory_text)
137                logging.info('%s: %s', tag, msg)
138            else:
139                sys.stdout.write(
140                    '%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' %
141                    (_BEGINNING_OF_LINE, _CLEAR_LINE, '\n%s' %
142                     explanatory_text if explanatory_text is not None else '',
143                     _COLORS[_TAG_COLOR[tag]][1], _COLORS[_TAG_COLOR[tag]][0],
144                     tag, msg, '\n'
145                     if do_newline or explanatory_text is not None else ''))
146            sys.stdout.flush()
147            return
148        except IOError as e:
149            if e.errno != errno.EINTR:
150                raise
151
152
153message.old_tag = ''
154message.old_msg = ''
155
156
157def which(filename):
158    if '/' in filename:
159        return filename
160    for path in os.environ['PATH'].split(os.pathsep):
161        if os.path.exists(os.path.join(path, filename)):
162            return os.path.join(path, filename)
163    raise Exception('%s not found' % filename)
164
165
166class JobSpec(object):
167    """Specifies what to run for a job."""
168
169    def __init__(self,
170                 cmdline,
171                 shortname=None,
172                 environ=None,
173                 cwd=None,
174                 shell=False,
175                 timeout_seconds=5 * 60,
176                 flake_retries=0,
177                 timeout_retries=0,
178                 kill_handler=None,
179                 cpu_cost=1.0,
180                 verbose_success=False,
181                 logfilename=None):
182        """
183    Arguments:
184      cmdline: a list of arguments to pass as the command line
185      environ: a dictionary of environment variables to set in the child process
186      kill_handler: a handler that will be called whenever job.kill() is invoked
187      cpu_cost: number of cores per second this job needs
188      logfilename: use given file to store job's output, rather than using a temporary file
189    """
190        if environ is None:
191            environ = {}
192        self.cmdline = cmdline
193        self.environ = environ
194        self.shortname = cmdline[0] if shortname is None else shortname
195        self.cwd = cwd
196        self.shell = shell
197        self.timeout_seconds = timeout_seconds
198        self.flake_retries = flake_retries
199        self.timeout_retries = timeout_retries
200        self.kill_handler = kill_handler
201        self.cpu_cost = cpu_cost
202        self.verbose_success = verbose_success
203        self.logfilename = logfilename
204        if self.logfilename and self.flake_retries != 0 and self.timeout_retries != 0:
205            # Forbidden to avoid overwriting the test log when retrying.
206            raise Exception(
207                'Cannot use custom logfile when retries are enabled')
208
209    def identity(self):
210        return '%r %r' % (self.cmdline, self.environ)
211
212    def __hash__(self):
213        return hash(self.identity())
214
215    def __cmp__(self, other):
216        return self.identity() == other.identity()
217
218    def __lt__(self, other):
219        return self.identity() < other.identity()
220
221    def __repr__(self):
222        return 'JobSpec(shortname=%s, cmdline=%s)' % (self.shortname,
223                                                      self.cmdline)
224
225    def __str__(self):
226        return '%s: %s %s' % (self.shortname, ' '.join(
227            '%s=%s' % kv for kv in list(self.environ.items())), ' '.join(
228                self.cmdline))
229
230
231class JobResult(object):
232
233    def __init__(self):
234        self.state = 'UNKNOWN'
235        self.returncode = -1
236        self.elapsed_time = 0
237        self.num_failures = 0
238        self.retries = 0
239        self.message = ''
240        self.cpu_estimated = 1
241        self.cpu_measured = 1
242
243
244def read_from_start(f):
245    f.seek(0)
246    return f.read()
247
248
249class Job(object):
250    """Manages one job."""
251
252    def __init__(self,
253                 spec,
254                 newline_on_success,
255                 travis,
256                 add_env,
257                 quiet_success=False):
258        self._spec = spec
259        self._newline_on_success = newline_on_success
260        self._travis = travis
261        self._add_env = add_env.copy()
262        self._retries = 0
263        self._timeout_retries = 0
264        self._suppress_failure_message = False
265        self._quiet_success = quiet_success
266        if not self._quiet_success:
267            message('START', spec.shortname, do_newline=self._travis)
268        self.result = JobResult()
269        self.start()
270
271    def GetSpec(self):
272        return self._spec
273
274    def start(self):
275        if self._spec.logfilename:
276            # make sure the log directory exists
277            logfile_dir = os.path.dirname(
278                os.path.abspath(self._spec.logfilename))
279            if not os.path.exists(logfile_dir):
280                os.makedirs(logfile_dir)
281            self._logfile = open(self._spec.logfilename, 'w+')
282        else:
283            # macOS: a series of quick os.unlink invocation might cause OS
284            # error during the creation of temporary file. By using
285            # NamedTemporaryFile, we defer the removal of file and directory.
286            self._logfile = tempfile.NamedTemporaryFile()
287        env = dict(os.environ)
288        env.update(self._spec.environ)
289        env.update(self._add_env)
290        env = sanitized_environment(env)
291        self._start = time.time()
292        cmdline = self._spec.cmdline
293        # The Unix time command is finicky when used with MSBuild, so we don't use it
294        # with jobs that run MSBuild.
295        global measure_cpu_costs
296        if measure_cpu_costs and not 'vsprojects\\build' in cmdline[0]:
297            cmdline = ['time', '-p'] + cmdline
298        else:
299            measure_cpu_costs = False
300        try_start = lambda: subprocess.Popen(args=cmdline,
301                                             stderr=subprocess.STDOUT,
302                                             stdout=self._logfile,
303                                             cwd=self._spec.cwd,
304                                             shell=self._spec.shell,
305                                             env=env)
306        delay = 0.3
307        for i in range(0, 4):
308            try:
309                self._process = try_start()
310                break
311            except OSError:
312                message(
313                    'WARNING', 'Failed to start %s, retrying in %f seconds' %
314                    (self._spec.shortname, delay))
315                time.sleep(delay)
316                delay *= 2
317        else:
318            self._process = try_start()
319        self._state = _RUNNING
320
321    def state(self):
322        """Poll current state of the job. Prints messages at completion."""
323
324        def stdout(self=self):
325            stdout = read_from_start(self._logfile)
326            self.result.message = stdout[-_MAX_RESULT_SIZE:]
327            return stdout
328
329        if self._state == _RUNNING and self._process.poll() is not None:
330            elapsed = time.time() - self._start
331            self.result.elapsed_time = elapsed
332            if self._process.returncode != 0:
333                if self._retries < self._spec.flake_retries:
334                    message('FLAKE',
335                            '%s [ret=%d, pid=%d]' %
336                            (self._spec.shortname, self._process.returncode,
337                             self._process.pid),
338                            stdout(),
339                            do_newline=True)
340                    self._retries += 1
341                    self.result.num_failures += 1
342                    self.result.retries = self._timeout_retries + self._retries
343                    # NOTE: job is restarted regardless of jobset's max_time setting
344                    self.start()
345                else:
346                    self._state = _FAILURE
347                    if not self._suppress_failure_message:
348                        message('FAILED',
349                                '%s [ret=%d, pid=%d, time=%.1fsec]' %
350                                (self._spec.shortname, self._process.returncode,
351                                 self._process.pid, elapsed),
352                                stdout(),
353                                do_newline=True)
354                    self.result.state = 'FAILED'
355                    self.result.num_failures += 1
356                    self.result.returncode = self._process.returncode
357            else:
358                self._state = _SUCCESS
359                measurement = ''
360                if measure_cpu_costs:
361                    m = re.search(
362                        r'real\s+([0-9.]+)\nuser\s+([0-9.]+)\nsys\s+([0-9.]+)',
363                        (stdout()).decode('utf8', errors='replace'))
364                    real = float(m.group(1))
365                    user = float(m.group(2))
366                    sys = float(m.group(3))
367                    if real > 0.5:
368                        cores = (user + sys) / real
369                        self.result.cpu_measured = float('%.01f' % cores)
370                        self.result.cpu_estimated = float('%.01f' %
371                                                          self._spec.cpu_cost)
372                        measurement = '; cpu_cost=%.01f; estimated=%.01f' % (
373                            self.result.cpu_measured, self.result.cpu_estimated)
374                if not self._quiet_success:
375                    message('PASSED',
376                            '%s [time=%.1fsec, retries=%d:%d%s]' %
377                            (self._spec.shortname, elapsed, self._retries,
378                             self._timeout_retries, measurement),
379                            stdout() if self._spec.verbose_success else None,
380                            do_newline=self._newline_on_success or self._travis)
381                self.result.state = 'PASSED'
382        elif (self._state == _RUNNING and
383              self._spec.timeout_seconds is not None and
384              time.time() - self._start > self._spec.timeout_seconds):
385            elapsed = time.time() - self._start
386            self.result.elapsed_time = elapsed
387            if self._timeout_retries < self._spec.timeout_retries:
388                message('TIMEOUT_FLAKE',
389                        '%s [pid=%d]' %
390                        (self._spec.shortname, self._process.pid),
391                        stdout(),
392                        do_newline=True)
393                self._timeout_retries += 1
394                self.result.num_failures += 1
395                self.result.retries = self._timeout_retries + self._retries
396                if self._spec.kill_handler:
397                    self._spec.kill_handler(self)
398                self._process.terminate()
399                # NOTE: job is restarted regardless of jobset's max_time setting
400                self.start()
401            else:
402                message('TIMEOUT',
403                        '%s [pid=%d, time=%.1fsec]' %
404                        (self._spec.shortname, self._process.pid, elapsed),
405                        stdout(),
406                        do_newline=True)
407                self.kill()
408                self.result.state = 'TIMEOUT'
409                self.result.num_failures += 1
410        return self._state
411
412    def kill(self):
413        if self._state == _RUNNING:
414            self._state = _KILLED
415            if self._spec.kill_handler:
416                self._spec.kill_handler(self)
417            self._process.terminate()
418
419    def suppress_failure_message(self):
420        self._suppress_failure_message = True
421
422
423class Jobset(object):
424    """Manages one run of jobs."""
425
426    def __init__(self, check_cancelled, maxjobs, maxjobs_cpu_agnostic,
427                 newline_on_success, travis, stop_on_failure, add_env,
428                 quiet_success, max_time):
429        self._running = set()
430        self._check_cancelled = check_cancelled
431        self._cancelled = False
432        self._failures = 0
433        self._completed = 0
434        self._maxjobs = maxjobs
435        self._maxjobs_cpu_agnostic = maxjobs_cpu_agnostic
436        self._newline_on_success = newline_on_success
437        self._travis = travis
438        self._stop_on_failure = stop_on_failure
439        self._add_env = add_env
440        self._quiet_success = quiet_success
441        self._max_time = max_time
442        self.resultset = {}
443        self._remaining = None
444        self._start_time = time.time()
445
446    def set_remaining(self, remaining):
447        self._remaining = remaining
448
449    def get_num_failures(self):
450        return self._failures
451
452    def cpu_cost(self):
453        c = 0
454        for job in self._running:
455            c += job._spec.cpu_cost
456        return c
457
458    def start(self, spec):
459        """Start a job. Return True on success, False on failure."""
460        while True:
461            if self._max_time > 0 and time.time(
462            ) - self._start_time > self._max_time:
463                skipped_job_result = JobResult()
464                skipped_job_result.state = 'SKIPPED'
465                message('SKIPPED', spec.shortname, do_newline=True)
466                self.resultset[spec.shortname] = [skipped_job_result]
467                return True
468            if self.cancelled():
469                return False
470            current_cpu_cost = self.cpu_cost()
471            if current_cpu_cost == 0:
472                break
473            if current_cpu_cost + spec.cpu_cost <= self._maxjobs:
474                if len(self._running) < self._maxjobs_cpu_agnostic:
475                    break
476            self.reap(spec.shortname, spec.cpu_cost)
477        if self.cancelled():
478            return False
479        job = Job(spec, self._newline_on_success, self._travis, self._add_env,
480                  self._quiet_success)
481        self._running.add(job)
482        if job.GetSpec().shortname not in self.resultset:
483            self.resultset[job.GetSpec().shortname] = []
484        return True
485
486    def reap(self, waiting_for=None, waiting_for_cost=None):
487        """Collect the dead jobs."""
488        while self._running:
489            dead = set()
490            for job in self._running:
491                st = eintr_be_gone(lambda: job.state())
492                if st == _RUNNING:
493                    continue
494                if st == _FAILURE or st == _KILLED:
495                    self._failures += 1
496                    if self._stop_on_failure:
497                        self._cancelled = True
498                        for job in self._running:
499                            job.kill()
500                dead.add(job)
501                break
502            for job in dead:
503                self._completed += 1
504                if not self._quiet_success or job.result.state != 'PASSED':
505                    self.resultset[job.GetSpec().shortname].append(job.result)
506                self._running.remove(job)
507            if dead:
508                return
509            if not self._travis and platform_string() != 'windows':
510                rstr = '' if self._remaining is None else '%d queued, ' % self._remaining
511                if self._remaining is not None and self._completed > 0:
512                    now = time.time()
513                    sofar = now - self._start_time
514                    remaining = sofar / self._completed * (self._remaining +
515                                                           len(self._running))
516                    rstr = 'ETA %.1f sec; %s' % (remaining, rstr)
517                if waiting_for is not None:
518                    wstr = ' next: %s @ %.2f cpu' % (waiting_for,
519                                                     waiting_for_cost)
520                else:
521                    wstr = ''
522                message(
523                    'WAITING',
524                    '%s%d jobs running, %d complete, %d failed (load %.2f)%s' %
525                    (rstr, len(self._running), self._completed, self._failures,
526                     self.cpu_cost(), wstr))
527            if platform_string() == 'windows':
528                time.sleep(0.1)
529            else:
530                signal.alarm(10)
531                signal.pause()
532
533    def cancelled(self):
534        """Poll for cancellation."""
535        if self._cancelled:
536            return True
537        if not self._check_cancelled():
538            return False
539        for job in self._running:
540            job.kill()
541        self._cancelled = True
542        return True
543
544    def finish(self):
545        while self._running:
546            if self.cancelled():
547                pass  # poll cancellation
548            self.reap()
549        if platform_string() != 'windows':
550            signal.alarm(0)
551        return not self.cancelled() and self._failures == 0
552
553
554def _never_cancelled():
555    return False
556
557
558def tag_remaining(xs):
559    staging = []
560    for x in xs:
561        staging.append(x)
562        if len(staging) > 5000:
563            yield (staging.pop(0), None)
564    n = len(staging)
565    for i, x in enumerate(staging):
566        yield (x, n - i - 1)
567
568
569def run(cmdlines,
570        check_cancelled=_never_cancelled,
571        maxjobs=None,
572        maxjobs_cpu_agnostic=None,
573        newline_on_success=False,
574        travis=False,
575        infinite_runs=False,
576        stop_on_failure=False,
577        add_env={},
578        skip_jobs=False,
579        quiet_success=False,
580        max_time=-1):
581    if skip_jobs:
582        resultset = {}
583        skipped_job_result = JobResult()
584        skipped_job_result.state = 'SKIPPED'
585        for job in cmdlines:
586            message('SKIPPED', job.shortname, do_newline=True)
587            resultset[job.shortname] = [skipped_job_result]
588        return 0, resultset
589    js = Jobset(
590        check_cancelled, maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS,
591        maxjobs_cpu_agnostic if maxjobs_cpu_agnostic is not None else
592        _DEFAULT_MAX_JOBS, newline_on_success, travis, stop_on_failure, add_env,
593        quiet_success, max_time)
594    for cmdline, remaining in tag_remaining(cmdlines):
595        if not js.start(cmdline):
596            break
597        if remaining is not None:
598            js.set_remaining(remaining)
599    js.finish()
600    return js.get_num_failures(), js.resultset
601