• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""
2TestCmd.py:  a testing framework for commands and scripts.
3
4The TestCmd module provides a framework for portable automated testing of
5executable commands and scripts (in any language, not just Python), especially
6commands and scripts that require file system interaction.
7
8In addition to running tests and evaluating conditions, the TestCmd module
9manages and cleans up one or more temporary workspace directories, and provides
10methods for creating files and directories in those workspace directories from
11in-line data, here-documents), allowing tests to be completely self-contained.
12
13A TestCmd environment object is created via the usual invocation:
14
15    test = TestCmd()
16
17The TestCmd module provides pass_test(), fail_test(), and no_result() unbound
18methods that report test results for use with the Aegis change management
19system. These methods terminate the test immediately, reporting PASSED, FAILED
20or NO RESULT respectively and exiting with status 0 (success), 1 or 2
21respectively. This allows for a distinction between an actual failed test and a
22test that could not be properly evaluated because of an external condition (such
23as a full file system or incorrect permissions).
24
25"""
26
27# Copyright 2000 Steven Knight
28# This module is free software, and you may redistribute it and/or modify
29# it under the same terms as Python itself, so long as this copyright message
30# and disclaimer are retained in their original form.
31#
32# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
33# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
34# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
35# DAMAGE.
36#
37# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
38# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
39# PARTICULAR PURPOSE.  THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
40# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
41# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
42
43# Copyright 2002-2003 Vladimir Prus.
44# Copyright 2002-2003 Dave Abrahams.
45# Copyright 2006 Rene Rivera.
46# Distributed under the Boost Software License, Version 1.0.
47#    (See accompanying file LICENSE_1_0.txt or copy at
48#         http://www.boost.org/LICENSE_1_0.txt)
49
50from __future__ import print_function
51
52__author__ = "Steven Knight <knight@baldmt.com>"
53__revision__ = "TestCmd.py 0.D002 2001/08/31 14:56:12 software"
54__version__ = "0.02"
55
56from types import *
57
58import os
59import os.path
60import re
61import shutil
62import stat
63import subprocess
64import sys
65import tempfile
66import traceback
67
68
69tempfile.template = 'testcmd.'
70
71_Cleanup = []
72
73def _clean():
74    global _Cleanup
75    list = _Cleanup[:]
76    _Cleanup = []
77    list.reverse()
78    for test in list:
79        test.cleanup()
80
81sys.exitfunc = _clean
82
83
84def caller(tblist, skip):
85    string = ""
86    arr = []
87    for file, line, name, text in tblist:
88        if file[-10:] == "TestCmd.py":
89                break
90        arr = [(file, line, name, text)] + arr
91    atfrom = "at"
92    for file, line, name, text in arr[skip:]:
93        if name == "?":
94            name = ""
95        else:
96            name = " (" + name + ")"
97        string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
98        atfrom = "\tfrom"
99    return string
100
101
102def fail_test(self=None, condition=True, function=None, skip=0):
103    """Cause the test to fail.
104
105      By default, the fail_test() method reports that the test FAILED and exits
106    with a status of 1. If a condition argument is supplied, the test fails
107    only if the condition is true.
108
109    """
110    if not condition:
111        return
112    if not function is None:
113        function()
114    of = ""
115    desc = ""
116    sep = " "
117    if not self is None:
118        if self.program:
119            of = " of " + " ".join(self.program)
120            sep = "\n\t"
121        if self.description:
122            desc = " [" + self.description + "]"
123            sep = "\n\t"
124
125    at = caller(traceback.extract_stack(), skip)
126
127    sys.stderr.write("FAILED test" + of + desc + sep + at + """
128in directory: """ + os.getcwd() )
129    sys.exit(1)
130
131
132def no_result(self=None, condition=True, function=None, skip=0):
133    """Causes a test to exit with no valid result.
134
135      By default, the no_result() method reports NO RESULT for the test and
136    exits with a status of 2. If a condition argument is supplied, the test
137    fails only if the condition is true.
138
139    """
140    if not condition:
141        return
142    if not function is None:
143        function()
144    of = ""
145    desc = ""
146    sep = " "
147    if not self is None:
148        if self.program:
149            of = " of " + self.program
150            sep = "\n\t"
151        if self.description:
152            desc = " [" + self.description + "]"
153            sep = "\n\t"
154
155    at = caller(traceback.extract_stack(), skip)
156    sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
157    sys.exit(2)
158
159
160def pass_test(self=None, condition=True, function=None):
161    """Causes a test to pass.
162
163      By default, the pass_test() method reports PASSED for the test and exits
164    with a status of 0. If a condition argument is supplied, the test passes
165    only if the condition is true.
166
167    """
168    if not condition:
169        return
170    if not function is None:
171        function()
172    sys.stderr.write("PASSED\n")
173    sys.exit(0)
174
175class MatchError(object):
176    def __init__(self, message):
177        self.message = message
178    def __nonzero__(self):
179        return False
180    def __bool__(self):
181        return False
182
183def match_exact(lines=None, matches=None):
184    """
185      Returns whether the given lists or strings containing lines separated
186    using newline characters contain exactly the same data.
187
188    """
189    if not type(lines) is list:
190        lines = lines.split("\n")
191    if not type(matches) is list:
192        matches = matches.split("\n")
193    if len(lines) != len(matches):
194        return
195    for i in range(len(lines)):
196        if lines[i] != matches[i]:
197            return MatchError("Mismatch at line %d\n- %s\n+ %s\n" %
198                (i+1, matches[i], lines[i]))
199    if len(lines) < len(matches):
200        return MatchError("Missing lines at line %d\n- %s" %
201            (len(lines), "\n- ".join(matches[len(lines):])))
202    if len(lines) > len(matches):
203        return MatchError("Extra lines at line %d\n+ %s" %
204            (len(matches), "\n+ ".join(lines[len(matches):])))
205    return 1
206
207
208def match_re(lines=None, res=None):
209    """
210      Given lists or strings contain lines separated using newline characters.
211    This function matches those lines one by one, interpreting the lines in the
212    res parameter as regular expressions.
213
214    """
215    if not type(lines) is list:
216        lines = lines.split("\n")
217    if not type(res) is list:
218        res = res.split("\n")
219    for i in range(min(len(lines), len(res))):
220        if not re.compile("^" + res[i] + "$").search(lines[i]):
221            return MatchError("Mismatch at line %d\n- %s\n+ %s\n" %
222                (i+1, res[i], lines[i]))
223    if len(lines) < len(res):
224        return MatchError("Missing lines at line %d\n- %s" %
225            (len(lines), "\n- ".join(res[len(lines):])))
226    if len(lines) > len(res):
227        return MatchError("Extra lines at line %d\n+ %s" %
228            (len(res), "\n+ ".join(lines[len(res):])))
229    return 1
230
231
232class TestCmd:
233    def __init__(self, description=None, program=None, workdir=None,
234        subdir=None, verbose=False, match=None, inpath=None):
235
236        self._cwd = os.getcwd()
237        self.description_set(description)
238        self.program_set(program, inpath)
239        self.verbose_set(verbose)
240        if match is None:
241            self.match_func = match_re
242        else:
243            self.match_func = match
244        self._dirlist = []
245        self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
246        env = os.environ.get('PRESERVE')
247        if env:
248            self._preserve['pass_test'] = env
249            self._preserve['fail_test'] = env
250            self._preserve['no_result'] = env
251        else:
252            env = os.environ.get('PRESERVE_PASS')
253            if env is not None:
254                self._preserve['pass_test'] = env
255            env = os.environ.get('PRESERVE_FAIL')
256            if env is not None:
257                self._preserve['fail_test'] = env
258            env = os.environ.get('PRESERVE_PASS')
259            if env is not None:
260                self._preserve['PRESERVE_NO_RESULT'] = env
261        self._stdout = []
262        self._stderr = []
263        self.status = None
264        self.condition = 'no_result'
265        self.workdir_set(workdir)
266        self.subdir(subdir)
267
268    def __del__(self):
269        self.cleanup()
270
271    def __repr__(self):
272        return "%x" % id(self)
273
274    def cleanup(self, condition=None):
275        """
276          Removes any temporary working directories for the specified TestCmd
277        environment. If the environment variable PRESERVE was set when the
278        TestCmd environment was created, temporary working directories are not
279        removed. If any of the environment variables PRESERVE_PASS,
280        PRESERVE_FAIL or PRESERVE_NO_RESULT were set when the TestCmd
281        environment was created, then temporary working directories are not
282        removed if the test passed, failed or had no result, respectively.
283        Temporary working directories are also preserved for conditions
284        specified via the preserve method.
285
286          Typically, this method is not called directly, but is used when the
287        script exits to clean up temporary working directories as appropriate
288        for the exit status.
289
290        """
291        if not self._dirlist:
292            return
293        if condition is None:
294            condition = self.condition
295        if self._preserve[condition]:
296            for dir in self._dirlist:
297                print("Preserved directory %s" % dir)
298        else:
299            list = self._dirlist[:]
300            list.reverse()
301            for dir in list:
302                self.writable(dir, 1)
303                shutil.rmtree(dir, ignore_errors=1)
304
305        self._dirlist = []
306        self.workdir = None
307        os.chdir(self._cwd)
308        try:
309            global _Cleanup
310            _Cleanup.remove(self)
311        except (AttributeError, ValueError):
312            pass
313
314    def description_set(self, description):
315        """Set the description of the functionality being tested."""
316        self.description = description
317
318    def fail_test(self, condition=True, function=None, skip=0):
319        """Cause the test to fail."""
320        if not condition:
321            return
322        self.condition = 'fail_test'
323        fail_test(self = self,
324                  condition = condition,
325                  function = function,
326                  skip = skip)
327
328    def match(self, lines, matches):
329        """Compare actual and expected file contents."""
330        return self.match_func(lines, matches)
331
332    def match_exact(self, lines, matches):
333        """Compare actual and expected file content exactly."""
334        return match_exact(lines, matches)
335
336    def match_re(self, lines, res):
337        """Compare file content with a regular expression."""
338        return match_re(lines, res)
339
340    def no_result(self, condition=True, function=None, skip=0):
341        """Report that the test could not be run."""
342        if not condition:
343            return
344        self.condition = 'no_result'
345        no_result(self = self,
346                  condition = condition,
347                  function = function,
348                  skip = skip)
349
350    def pass_test(self, condition=True, function=None):
351        """Cause the test to pass."""
352        if not condition:
353            return
354        self.condition = 'pass_test'
355        pass_test(self, condition, function)
356
357    def preserve(self, *conditions):
358        """
359          Arrange for the temporary working directories for the specified
360        TestCmd environment to be preserved for one or more conditions. If no
361        conditions are specified, arranges for the temporary working
362        directories to be preserved for all conditions.
363
364        """
365        if conditions is ():
366            conditions = ('pass_test', 'fail_test', 'no_result')
367        for cond in conditions:
368            self._preserve[cond] = 1
369
370    def program_set(self, program, inpath):
371        """Set the executable program or script to be tested."""
372        if not inpath and program and not os.path.isabs(program[0]):
373            program[0] = os.path.join(self._cwd, program[0])
374        self.program = program
375
376    def read(self, file, mode='rb'):
377        """
378          Reads and returns the contents of the specified file name. The file
379        name may be a list, in which case the elements are concatenated with
380        the os.path.join() method. The file is assumed to be under the
381        temporary working directory unless it is an absolute path name. The I/O
382        mode for the file may be specified and must begin with an 'r'. The
383        default is 'rb' (binary read).
384
385        """
386        if type(file) is list:
387            file = os.path.join(*file)
388        if not os.path.isabs(file):
389            file = os.path.join(self.workdir, file)
390        if mode[0] != 'r':
391            raise ValueError("mode must begin with 'r'")
392        return open(file, mode).read()
393
394    def run(self, program=None, arguments=None, chdir=None, stdin=None,
395        universal_newlines=True):
396        """
397          Runs a test of the program or script for the test environment.
398        Standard output and error output are saved for future retrieval via the
399        stdout() and stderr() methods.
400
401          'universal_newlines' parameter controls how the child process
402        input/output streams are opened as defined for the same named Python
403        subprocess.POpen constructor parameter.
404
405        """
406        if chdir:
407            if not os.path.isabs(chdir):
408                chdir = os.path.join(self.workpath(chdir))
409            if self.verbose:
410                sys.stderr.write("chdir(" + chdir + ")\n")
411        else:
412            chdir = self.workdir
413
414        cmd = []
415        if program and program[0]:
416            if program[0] != self.program[0] and not os.path.isabs(program[0]):
417                program[0] = os.path.join(self._cwd, program[0])
418            cmd += program
419        else:
420            cmd += self.program
421        if arguments:
422            cmd += arguments.split(" ")
423        if self.verbose:
424            sys.stderr.write("run(" + " ".join(cmd) + ")\n")
425        p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
426            stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=chdir,
427            universal_newlines=universal_newlines)
428
429        if stdin:
430            if type(stdin) is list:
431                stdin = "".join(stdin)
432        out, err = p.communicate(stdin)
433        if not type(out) is str:
434            out = out.decode()
435        if not type(err) is str:
436            err = err.decode()
437        self._stdout.append(out)
438        self._stderr.append(err)
439        self.status = p.returncode
440
441        if self.verbose:
442            sys.stdout.write(self._stdout[-1])
443            sys.stderr.write(self._stderr[-1])
444
445    def stderr(self, run=None):
446        """
447          Returns the error output from the specified run number. If there is
448        no specified run number, then returns the error output of the last run.
449        If the run number is less than zero, then returns the error output from
450        that many runs back from the current run.
451
452        """
453        if not run:
454            run = len(self._stderr)
455        elif run < 0:
456            run = len(self._stderr) + run
457        run -= 1
458        if run < 0:
459            return ''
460        return self._stderr[run]
461
462    def stdout(self, run=None):
463        """
464          Returns the standard output from the specified run number. If there
465        is no specified run number, then returns the standard output of the
466        last run. If the run number is less than zero, then returns the
467        standard output from that many runs back from the current run.
468
469        """
470        if not run:
471            run = len(self._stdout)
472        elif run < 0:
473            run = len(self._stdout) + run
474        run -= 1
475        if run < 0:
476            return ''
477        return self._stdout[run]
478
479    def subdir(self, *subdirs):
480        """
481          Create new subdirectories under the temporary working directory, one
482        for each argument. An argument may be a list, in which case the list
483        elements are concatenated using the os.path.join() method.
484        Subdirectories multiple levels deep must be created using a separate
485        argument for each level:
486
487            test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
488
489        Returns the number of subdirectories actually created.
490
491        """
492        count = 0
493        for sub in subdirs:
494            if sub is None:
495                continue
496            if type(sub) is list:
497                sub = os.path.join(*tuple(sub))
498            new = os.path.join(self.workdir, sub)
499            try:
500                os.mkdir(new)
501            except:
502                pass
503            else:
504                count += 1
505        return count
506
507    def unlink(self, file):
508        """
509          Unlinks the specified file name. The file name may be a list, in
510        which case the elements are concatenated using the os.path.join()
511        method. The file is assumed to be under the temporary working directory
512        unless it is an absolute path name.
513
514        """
515        if type(file) is list:
516            file = os.path.join(*tuple(file))
517        if not os.path.isabs(file):
518            file = os.path.join(self.workdir, file)
519        os.unlink(file)
520
521    def verbose_set(self, verbose):
522        """Set the verbose level."""
523        self.verbose = verbose
524
525    def workdir_set(self, path):
526        """
527          Creates a temporary working directory with the specified path name.
528        If the path is a null string (''), a unique directory name is created.
529
530        """
531        if os.path.isabs(path):
532            self.workdir = path
533        else:
534            if path != None:
535                if path == '':
536                    path = tempfile.mktemp()
537                if path != None:
538                    os.mkdir(path)
539                self._dirlist.append(path)
540                global _Cleanup
541                try:
542                    _Cleanup.index(self)
543                except ValueError:
544                    _Cleanup.append(self)
545                # We would like to set self.workdir like this:
546                #     self.workdir = path
547                # But symlinks in the path will report things differently from
548                # os.getcwd(), so chdir there and back to fetch the canonical
549                # path.
550                cwd = os.getcwd()
551                os.chdir(path)
552                self.workdir = os.getcwd()
553                os.chdir(cwd)
554            else:
555                self.workdir = None
556
557    def workpath(self, *args):
558        """
559          Returns the absolute path name to a subdirectory or file within the
560        current temporary working directory. Concatenates the temporary working
561        directory name with the specified arguments using os.path.join().
562
563        """
564        return os.path.join(self.workdir, *tuple(args))
565
566    def writable(self, top, write):
567        """
568          Make the specified directory tree writable (write == 1) or not
569        (write == None).
570
571        """
572        def _walk_chmod(arg, dirname, names):
573            st = os.stat(dirname)
574            os.chmod(dirname, arg(st[stat.ST_MODE]))
575            for name in names:
576                fullname = os.path.join(dirname, name)
577                st = os.stat(fullname)
578                os.chmod(fullname, arg(st[stat.ST_MODE]))
579
580        _mode_writable = lambda mode: stat.S_IMODE(mode|0o200)
581        _mode_non_writable = lambda mode: stat.S_IMODE(mode&~0o200)
582
583        if write:
584            f = _mode_writable
585        else:
586            f = _mode_non_writable
587        try:
588            for root, _, files in os.walk(top):
589                _walk_chmod(f, root, files)
590        except:
591            pass  # Ignore any problems changing modes.
592
593    def write(self, file, content, mode='wb'):
594        """
595          Writes the specified content text (second argument) to the specified
596        file name (first argument). The file name may be a list, in which case
597        the elements are concatenated using the os.path.join() method. The file
598        is created under the temporary working directory. Any subdirectories in
599        the path must already exist. The I/O mode for the file may be specified
600        and must begin with a 'w'. The default is 'wb' (binary write).
601
602        """
603        if type(file) is list:
604            file = os.path.join(*tuple(file))
605        if not os.path.isabs(file):
606            file = os.path.join(self.workdir, file)
607        if mode[0] != 'w':
608            raise ValueError("mode must begin with 'w'")
609        open(file, mode).write(content)
610