• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""
2TestCmd.py:  a testing framework for commands and scripts.
3
4The TestCmd module provides a framework for portable automated testing
5of executable commands and scripts (in any language, not just Python),
6especially commands and scripts that require file system interaction.
7
8In addition to running tests and evaluating conditions, the TestCmd
9module manages and cleans up one or more temporary workspace
10directories, and provides methods for creating files and directories in
11those workspace directories from in-line data, here-documents), allowing
12tests to be completely self-contained.
13
14A TestCmd environment object is created via the usual invocation:
15
16    import TestCmd
17    test = TestCmd.TestCmd()
18
19There are a bunch of keyword arguments available at instantiation:
20
21    test = TestCmd.TestCmd(description = 'string',
22                           program = 'program_or_script_to_test',
23                           interpreter = 'script_interpreter',
24                           workdir = 'prefix',
25                           subdir = 'subdir',
26                           verbose = Boolean,
27                           match = default_match_function,
28                           diff = default_diff_function,
29                           combine = Boolean)
30
31There are a bunch of methods that let you do different things:
32
33    test.verbose_set(1)
34
35    test.description_set('string')
36
37    test.program_set('program_or_script_to_test')
38
39    test.interpreter_set('script_interpreter')
40    test.interpreter_set(['script_interpreter', 'arg'])
41
42    test.workdir_set('prefix')
43    test.workdir_set('')
44
45    test.workpath('file')
46    test.workpath('subdir', 'file')
47
48    test.subdir('subdir', ...)
49
50    test.rmdir('subdir', ...)
51
52    test.write('file', "contents\n")
53    test.write(['subdir', 'file'], "contents\n")
54
55    test.read('file')
56    test.read(['subdir', 'file'])
57    test.read('file', mode)
58    test.read(['subdir', 'file'], mode)
59
60    test.writable('dir', 1)
61    test.writable('dir', None)
62
63    test.preserve(condition, ...)
64
65    test.cleanup(condition)
66
67    test.command_args(program = 'program_or_script_to_run',
68                      interpreter = 'script_interpreter',
69                      arguments = 'arguments to pass to program')
70
71    test.run(program = 'program_or_script_to_run',
72             interpreter = 'script_interpreter',
73             arguments = 'arguments to pass to program',
74             chdir = 'directory_to_chdir_to',
75             stdin = 'input to feed to the program\n')
76             universal_newlines = True)
77
78    p = test.start(program = 'program_or_script_to_run',
79                   interpreter = 'script_interpreter',
80                   arguments = 'arguments to pass to program',
81                   universal_newlines = None)
82
83    test.finish(self, p)
84
85    test.pass_test()
86    test.pass_test(condition)
87    test.pass_test(condition, function)
88
89    test.fail_test()
90    test.fail_test(condition)
91    test.fail_test(condition, function)
92    test.fail_test(condition, function, skip)
93
94    test.no_result()
95    test.no_result(condition)
96    test.no_result(condition, function)
97    test.no_result(condition, function, skip)
98
99    test.stdout()
100    test.stdout(run)
101
102    test.stderr()
103    test.stderr(run)
104
105    test.symlink(target, link)
106
107    test.banner(string)
108    test.banner(string, width)
109
110    test.diff(actual, expected)
111
112    test.match(actual, expected)
113
114    test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
115    test.match_exact(["actual 1\n", "actual 2\n"],
116                     ["expected 1\n", "expected 2\n"])
117
118    test.match_re("actual 1\nactual 2\n", regex_string)
119    test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes)
120
121    test.match_re_dotall("actual 1\nactual 2\n", regex_string)
122    test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
123
124    test.tempdir()
125    test.tempdir('temporary-directory')
126
127    test.sleep()
128    test.sleep(seconds)
129
130    test.where_is('foo')
131    test.where_is('foo', 'PATH1:PATH2')
132    test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
133
134    test.unlink('file')
135    test.unlink('subdir', 'file')
136
137The TestCmd module provides pass_test(), fail_test(), and no_result()
138unbound functions that report test results for use with the Aegis change
139management system.  These methods terminate the test immediately,
140reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
141status 0 (success), 1 or 2 respectively.  This allows for a distinction
142between an actual failed test and a test that could not be properly
143evaluated because of an external condition (such as a full file system
144or incorrect permissions).
145
146    import TestCmd
147
148    TestCmd.pass_test()
149    TestCmd.pass_test(condition)
150    TestCmd.pass_test(condition, function)
151
152    TestCmd.fail_test()
153    TestCmd.fail_test(condition)
154    TestCmd.fail_test(condition, function)
155    TestCmd.fail_test(condition, function, skip)
156
157    TestCmd.no_result()
158    TestCmd.no_result(condition)
159    TestCmd.no_result(condition, function)
160    TestCmd.no_result(condition, function, skip)
161
162The TestCmd module also provides unbound functions that handle matching
163in the same way as the match_*() methods described above.
164
165    import TestCmd
166
167    test = TestCmd.TestCmd(match = TestCmd.match_exact)
168
169    test = TestCmd.TestCmd(match = TestCmd.match_re)
170
171    test = TestCmd.TestCmd(match = TestCmd.match_re_dotall)
172
173The TestCmd module provides unbound functions that can be used for the
174"diff" argument to TestCmd.TestCmd instantiation:
175
176    import TestCmd
177
178    test = TestCmd.TestCmd(match = TestCmd.match_re,
179                           diff = TestCmd.diff_re)
180
181    test = TestCmd.TestCmd(diff = TestCmd.simple_diff)
182
183The "diff" argument can also be used with standard difflib functions:
184
185    import difflib
186
187    test = TestCmd.TestCmd(diff = difflib.context_diff)
188
189    test = TestCmd.TestCmd(diff = difflib.unified_diff)
190
191Lastly, the where_is() method also exists in an unbound function
192version.
193
194    import TestCmd
195
196    TestCmd.where_is('foo')
197    TestCmd.where_is('foo', 'PATH1:PATH2')
198    TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
199"""
200
201# Copyright 2000-2010 Steven Knight
202# This module is free software, and you may redistribute it and/or modify
203# it under the same terms as Python itself, so long as this copyright message
204# and disclaimer are retained in their original form.
205#
206# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
207# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
208# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
209# DAMAGE.
210#
211# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
212# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
213# PARTICULAR PURPOSE.  THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
214# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
215# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
216
217__author__ = "Steven Knight <knight at baldmt dot com>"
218__revision__ = "TestCmd.py 0.37.D001 2010/01/11 16:55:50 knight"
219__version__ = "0.37"
220
221import errno
222import os
223import os.path
224import re
225import shutil
226import stat
227import string
228import sys
229import tempfile
230import time
231import traceback
232import types
233import UserList
234
235__all__ = [
236    'diff_re',
237    'fail_test',
238    'no_result',
239    'pass_test',
240    'match_exact',
241    'match_re',
242    'match_re_dotall',
243    'python_executable',
244    'TestCmd'
245]
246
247try:
248    import difflib
249except ImportError:
250    __all__.append('simple_diff')
251
252def is_List(e):
253    return type(e) is types.ListType \
254        or isinstance(e, UserList.UserList)
255
256try:
257    from UserString import UserString
258except ImportError:
259    class UserString:
260        pass
261
262if hasattr(types, 'UnicodeType'):
263    def is_String(e):
264        return type(e) is types.StringType \
265            or type(e) is types.UnicodeType \
266            or isinstance(e, UserString)
267else:
268    def is_String(e):
269        return type(e) is types.StringType or isinstance(e, UserString)
270
271tempfile.template = 'testcmd.'
272if os.name in ('posix', 'nt'):
273    tempfile.template = 'testcmd.' + str(os.getpid()) + '.'
274else:
275    tempfile.template = 'testcmd.'
276
277re_space = re.compile('\s')
278
279_Cleanup = []
280
281_chain_to_exitfunc = None
282
283def _clean():
284    global _Cleanup
285    cleanlist = filter(None, _Cleanup)
286    del _Cleanup[:]
287    cleanlist.reverse()
288    for test in cleanlist:
289        test.cleanup()
290    if _chain_to_exitfunc:
291        _chain_to_exitfunc()
292
293try:
294    import atexit
295except ImportError:
296    # TODO(1.5): atexit requires python 2.0, so chain sys.exitfunc
297    try:
298        _chain_to_exitfunc = sys.exitfunc
299    except AttributeError:
300        pass
301    sys.exitfunc = _clean
302else:
303    atexit.register(_clean)
304
305try:
306    zip
307except NameError:
308    def zip(*lists):
309        result = []
310        for i in xrange(min(map(len, lists))):
311            result.append(tuple(map(lambda l, i=i: l[i], lists)))
312        return result
313
314class Collector:
315    def __init__(self, top):
316        self.entries = [top]
317    def __call__(self, arg, dirname, names):
318        pathjoin = lambda n, d=dirname: os.path.join(d, n)
319        self.entries.extend(map(pathjoin, names))
320
321def _caller(tblist, skip):
322    string = ""
323    arr = []
324    for file, line, name, text in tblist:
325        if file[-10:] == "TestCmd.py":
326                break
327        arr = [(file, line, name, text)] + arr
328    atfrom = "at"
329    for file, line, name, text in arr[skip:]:
330        if name in ("?", "<module>"):
331            name = ""
332        else:
333            name = " (" + name + ")"
334        string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
335        atfrom = "\tfrom"
336    return string
337
338def fail_test(self = None, condition = 1, function = None, skip = 0):
339    """Cause the test to fail.
340
341    By default, the fail_test() method reports that the test FAILED
342    and exits with a status of 1.  If a condition argument is supplied,
343    the test fails only if the condition is true.
344    """
345    if not condition:
346        return
347    if not function is None:
348        function()
349    of = ""
350    desc = ""
351    sep = " "
352    if not self is None:
353        if self.program:
354            of = " of " + self.program
355            sep = "\n\t"
356        if self.description:
357            desc = " [" + self.description + "]"
358            sep = "\n\t"
359
360    at = _caller(traceback.extract_stack(), skip)
361    sys.stderr.write("FAILED test" + of + desc + sep + at)
362
363    sys.exit(1)
364
365def no_result(self = None, condition = 1, function = None, skip = 0):
366    """Causes a test to exit with no valid result.
367
368    By default, the no_result() method reports NO RESULT for the test
369    and exits with a status of 2.  If a condition argument is supplied,
370    the test fails only if the condition is true.
371    """
372    if not condition:
373        return
374    if not function is None:
375        function()
376    of = ""
377    desc = ""
378    sep = " "
379    if not self is None:
380        if self.program:
381            of = " of " + self.program
382            sep = "\n\t"
383        if self.description:
384            desc = " [" + self.description + "]"
385            sep = "\n\t"
386
387    if os.environ.get('TESTCMD_DEBUG_SKIPS'):
388        at = _caller(traceback.extract_stack(), skip)
389        sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
390    else:
391        sys.stderr.write("NO RESULT\n")
392
393    sys.exit(2)
394
395def pass_test(self = None, condition = 1, function = None):
396    """Causes a test to pass.
397
398    By default, the pass_test() method reports PASSED for the test
399    and exits with a status of 0.  If a condition argument is supplied,
400    the test passes only if the condition is true.
401    """
402    if not condition:
403        return
404    if not function is None:
405        function()
406    sys.stderr.write("PASSED\n")
407    sys.exit(0)
408
409def match_exact(lines = None, matches = None):
410    """
411    """
412    if not is_List(lines):
413        lines = string.split(lines, "\n")
414    if not is_List(matches):
415        matches = string.split(matches, "\n")
416    if len(lines) != len(matches):
417        return
418    for i in range(len(lines)):
419        if lines[i] != matches[i]:
420            return
421    return 1
422
423def match_re(lines = None, res = None):
424    """
425    """
426    if not is_List(lines):
427        lines = string.split(lines, "\n")
428    if not is_List(res):
429        res = string.split(res, "\n")
430    if len(lines) != len(res):
431        return
432    for i in range(len(lines)):
433        s = "^" + res[i] + "$"
434        try:
435            expr = re.compile(s)
436        except re.error, e:
437            msg = "Regular expression error in %s: %s"
438            raise re.error, msg % (repr(s), e[0])
439        if not expr.search(lines[i]):
440            return
441    return 1
442
443def match_re_dotall(lines = None, res = None):
444    """
445    """
446    if not type(lines) is type(""):
447        lines = string.join(lines, "\n")
448    if not type(res) is type(""):
449        res = string.join(res, "\n")
450    s = "^" + res + "$"
451    try:
452        expr = re.compile(s, re.DOTALL)
453    except re.error, e:
454        msg = "Regular expression error in %s: %s"
455        raise re.error, msg % (repr(s), e[0])
456    if expr.match(lines):
457        return 1
458
459try:
460    import difflib
461except ImportError:
462    pass
463else:
464    def simple_diff(a, b, fromfile='', tofile='',
465                    fromfiledate='', tofiledate='', n=3, lineterm='\n'):
466        """
467        A function with the same calling signature as difflib.context_diff
468        (diff -c) and difflib.unified_diff (diff -u) but which prints
469        output like the simple, unadorned 'diff" command.
470        """
471        sm = difflib.SequenceMatcher(None, a, b)
472        def comma(x1, x2):
473            return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2)
474        result = []
475        for op, a1, a2, b1, b2 in sm.get_opcodes():
476            if op == 'delete':
477                result.append("%sd%d" % (comma(a1, a2), b1))
478                result.extend(map(lambda l: '< ' + l, a[a1:a2]))
479            elif op == 'insert':
480                result.append("%da%s" % (a1, comma(b1, b2)))
481                result.extend(map(lambda l: '> ' + l, b[b1:b2]))
482            elif op == 'replace':
483                result.append("%sc%s" % (comma(a1, a2), comma(b1, b2)))
484                result.extend(map(lambda l: '< ' + l, a[a1:a2]))
485                result.append('---')
486                result.extend(map(lambda l: '> ' + l, b[b1:b2]))
487        return result
488
489def diff_re(a, b, fromfile='', tofile='',
490                fromfiledate='', tofiledate='', n=3, lineterm='\n'):
491    """
492    A simple "diff" of two sets of lines when the expected lines
493    are regular expressions.  This is a really dumb thing that
494    just compares each line in turn, so it doesn't look for
495    chunks of matching lines and the like--but at least it lets
496    you know exactly which line first didn't compare correctl...
497    """
498    result = []
499    diff = len(a) - len(b)
500    if diff < 0:
501        a = a + ['']*(-diff)
502    elif diff > 0:
503        b = b + ['']*diff
504    i = 0
505    for aline, bline in zip(a, b):
506        s = "^" + aline + "$"
507        try:
508            expr = re.compile(s)
509        except re.error, e:
510            msg = "Regular expression error in %s: %s"
511            raise re.error, msg % (repr(s), e[0])
512        if not expr.search(bline):
513            result.append("%sc%s" % (i+1, i+1))
514            result.append('< ' + repr(a[i]))
515            result.append('---')
516            result.append('> ' + repr(b[i]))
517        i = i+1
518    return result
519
520if os.name == 'java':
521
522    python_executable = os.path.join(sys.prefix, 'jython')
523
524else:
525
526    python_executable = sys.executable
527
528if sys.platform == 'win32':
529
530    default_sleep_seconds = 2
531
532    def where_is(file, path=None, pathext=None):
533        if path is None:
534            path = os.environ['PATH']
535        if is_String(path):
536            path = string.split(path, os.pathsep)
537        if pathext is None:
538            pathext = os.environ['PATHEXT']
539        if is_String(pathext):
540            pathext = string.split(pathext, os.pathsep)
541        for ext in pathext:
542            if string.lower(ext) == string.lower(file[-len(ext):]):
543                pathext = ['']
544                break
545        for dir in path:
546            f = os.path.join(dir, file)
547            for ext in pathext:
548                fext = f + ext
549                if os.path.isfile(fext):
550                    return fext
551        return None
552
553else:
554
555    def where_is(file, path=None, pathext=None):
556        if path is None:
557            path = os.environ['PATH']
558        if is_String(path):
559            path = string.split(path, os.pathsep)
560        for dir in path:
561            f = os.path.join(dir, file)
562            if os.path.isfile(f):
563                try:
564                    st = os.stat(f)
565                except OSError:
566                    continue
567                if stat.S_IMODE(st[stat.ST_MODE]) & 0111:
568                    return f
569        return None
570
571    default_sleep_seconds = 1
572
573
574
575try:
576    import subprocess
577except ImportError:
578    # The subprocess module doesn't exist in this version of Python,
579    # so we're going to cobble up something that looks just enough
580    # like its API for our purposes below.
581    import new
582
583    subprocess = new.module('subprocess')
584
585    subprocess.PIPE = 'PIPE'
586    subprocess.STDOUT = 'STDOUT'
587    subprocess.mswindows = (sys.platform == 'win32')
588
589    try:
590        import popen2
591        popen2.Popen3
592    except AttributeError:
593        class Popen3:
594            universal_newlines = 1
595            def __init__(self, command, **kw):
596                if sys.platform == 'win32' and command[0] == '"':
597                    command = '"' + command + '"'
598                (stdin, stdout, stderr) = os.popen3(' ' + command)
599                self.stdin = stdin
600                self.stdout = stdout
601                self.stderr = stderr
602            def close_output(self):
603                self.stdout.close()
604                self.resultcode = self.stderr.close()
605            def wait(self):
606                resultcode = self.resultcode
607                if os.WIFEXITED(resultcode):
608                    return os.WEXITSTATUS(resultcode)
609                elif os.WIFSIGNALED(resultcode):
610                    return os.WTERMSIG(resultcode)
611                else:
612                    return None
613
614    else:
615        try:
616            popen2.Popen4
617        except AttributeError:
618            # A cribbed Popen4 class, with some retrofitted code from
619            # the Python 1.5 Popen3 class methods to do certain things
620            # by hand.
621            class Popen4(popen2.Popen3):
622                childerr = None
623
624                def __init__(self, cmd, bufsize=-1):
625                    p2cread, p2cwrite = os.pipe()
626                    c2pread, c2pwrite = os.pipe()
627                    self.pid = os.fork()
628                    if self.pid == 0:
629                        # Child
630                        os.dup2(p2cread, 0)
631                        os.dup2(c2pwrite, 1)
632                        os.dup2(c2pwrite, 2)
633                        for i in range(3, popen2.MAXFD):
634                            try:
635                                os.close(i)
636                            except: pass
637                        try:
638                            os.execvp(cmd[0], cmd)
639                        finally:
640                            os._exit(1)
641                        # Shouldn't come here, I guess
642                        os._exit(1)
643                    os.close(p2cread)
644                    self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
645                    os.close(c2pwrite)
646                    self.fromchild = os.fdopen(c2pread, 'r', bufsize)
647                    popen2._active.append(self)
648
649            popen2.Popen4 = Popen4
650
651        class Popen3(popen2.Popen3, popen2.Popen4):
652            universal_newlines = 1
653            def __init__(self, command, **kw):
654                if kw.get('stderr') == 'STDOUT':
655                    apply(popen2.Popen4.__init__, (self, command, 1))
656                else:
657                    apply(popen2.Popen3.__init__, (self, command, 1))
658                self.stdin = self.tochild
659                self.stdout = self.fromchild
660                self.stderr = self.childerr
661            def wait(self, *args, **kw):
662                resultcode = apply(popen2.Popen3.wait, (self,)+args, kw)
663                if os.WIFEXITED(resultcode):
664                    return os.WEXITSTATUS(resultcode)
665                elif os.WIFSIGNALED(resultcode):
666                    return os.WTERMSIG(resultcode)
667                else:
668                    return None
669
670    subprocess.Popen = Popen3
671
672
673
674# From Josiah Carlson,
675# ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
676# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
677
678PIPE = subprocess.PIPE
679
680if subprocess.mswindows:
681    from win32file import ReadFile, WriteFile
682    from win32pipe import PeekNamedPipe
683    import msvcrt
684else:
685    import select
686    import fcntl
687
688    try:                    fcntl.F_GETFL
689    except AttributeError:  fcntl.F_GETFL = 3
690
691    try:                    fcntl.F_SETFL
692    except AttributeError:  fcntl.F_SETFL = 4
693
694class Popen(subprocess.Popen):
695    def recv(self, maxsize=None):
696        return self._recv('stdout', maxsize)
697
698    def recv_err(self, maxsize=None):
699        return self._recv('stderr', maxsize)
700
701    def send_recv(self, input='', maxsize=None):
702        return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
703
704    def get_conn_maxsize(self, which, maxsize):
705        if maxsize is None:
706            maxsize = 1024
707        elif maxsize < 1:
708            maxsize = 1
709        return getattr(self, which), maxsize
710
711    def _close(self, which):
712        getattr(self, which).close()
713        setattr(self, which, None)
714
715    if subprocess.mswindows:
716        def send(self, input):
717            if not self.stdin:
718                return None
719
720            try:
721                x = msvcrt.get_osfhandle(self.stdin.fileno())
722                (errCode, written) = WriteFile(x, input)
723            except ValueError:
724                return self._close('stdin')
725            except (subprocess.pywintypes.error, Exception), why:
726                if why[0] in (109, errno.ESHUTDOWN):
727                    return self._close('stdin')
728                raise
729
730            return written
731
732        def _recv(self, which, maxsize):
733            conn, maxsize = self.get_conn_maxsize(which, maxsize)
734            if conn is None:
735                return None
736
737            try:
738                x = msvcrt.get_osfhandle(conn.fileno())
739                (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
740                if maxsize < nAvail:
741                    nAvail = maxsize
742                if nAvail > 0:
743                    (errCode, read) = ReadFile(x, nAvail, None)
744            except ValueError:
745                return self._close(which)
746            except (subprocess.pywintypes.error, Exception), why:
747                if why[0] in (109, errno.ESHUTDOWN):
748                    return self._close(which)
749                raise
750
751            #if self.universal_newlines:
752            #    read = self._translate_newlines(read)
753            return read
754
755    else:
756        def send(self, input):
757            if not self.stdin:
758                return None
759
760            if not select.select([], [self.stdin], [], 0)[1]:
761                return 0
762
763            try:
764                written = os.write(self.stdin.fileno(), input)
765            except OSError, why:
766                if why[0] == errno.EPIPE: #broken pipe
767                    return self._close('stdin')
768                raise
769
770            return written
771
772        def _recv(self, which, maxsize):
773            conn, maxsize = self.get_conn_maxsize(which, maxsize)
774            if conn is None:
775                return None
776
777            try:
778                flags = fcntl.fcntl(conn, fcntl.F_GETFL)
779            except TypeError:
780                flags = None
781            else:
782                if not conn.closed:
783                    fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
784
785            try:
786                if not select.select([conn], [], [], 0)[0]:
787                    return ''
788
789                r = conn.read(maxsize)
790                if not r:
791                    return self._close(which)
792
793                #if self.universal_newlines:
794                #    r = self._translate_newlines(r)
795                return r
796            finally:
797                if not conn.closed and not flags is None:
798                    fcntl.fcntl(conn, fcntl.F_SETFL, flags)
799
800disconnect_message = "Other end disconnected!"
801
802def recv_some(p, t=.1, e=1, tr=5, stderr=0):
803    if tr < 1:
804        tr = 1
805    x = time.time()+t
806    y = []
807    r = ''
808    pr = p.recv
809    if stderr:
810        pr = p.recv_err
811    while time.time() < x or r:
812        r = pr()
813        if r is None:
814            if e:
815                raise Exception(disconnect_message)
816            else:
817                break
818        elif r:
819            y.append(r)
820        else:
821            time.sleep(max((x-time.time())/tr, 0))
822    return ''.join(y)
823
824# TODO(3.0:  rewrite to use memoryview()
825def send_all(p, data):
826    while len(data):
827        sent = p.send(data)
828        if sent is None:
829            raise Exception(disconnect_message)
830        data = buffer(data, sent)
831
832
833
834try:
835    object
836except NameError:
837    class object:
838        pass
839
840
841
842class TestCmd(object):
843    """Class TestCmd
844    """
845
846    def __init__(self, description = None,
847                       program = None,
848                       interpreter = None,
849                       workdir = None,
850                       subdir = None,
851                       verbose = None,
852                       match = None,
853                       diff = None,
854                       combine = 0,
855                       universal_newlines = 1):
856        self._cwd = os.getcwd()
857        self.description_set(description)
858        self.program_set(program)
859        self.interpreter_set(interpreter)
860        if verbose is None:
861            try:
862                verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) )
863            except ValueError:
864                verbose = 0
865        self.verbose_set(verbose)
866        self.combine = combine
867        self.universal_newlines = universal_newlines
868        if match is not None:
869            self.match_function = match
870        else:
871            self.match_function = match_re
872        if diff is not None:
873            self.diff_function = diff
874        else:
875            try:
876                difflib
877            except NameError:
878                pass
879            else:
880                self.diff_function = simple_diff
881                #self.diff_function = difflib.context_diff
882                #self.diff_function = difflib.unified_diff
883        self._dirlist = []
884        self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
885        if os.environ.has_key('PRESERVE') and not os.environ['PRESERVE'] is '':
886            self._preserve['pass_test'] = os.environ['PRESERVE']
887            self._preserve['fail_test'] = os.environ['PRESERVE']
888            self._preserve['no_result'] = os.environ['PRESERVE']
889        else:
890            try:
891                self._preserve['pass_test'] = os.environ['PRESERVE_PASS']
892            except KeyError:
893                pass
894            try:
895                self._preserve['fail_test'] = os.environ['PRESERVE_FAIL']
896            except KeyError:
897                pass
898            try:
899                self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT']
900            except KeyError:
901                pass
902        self._stdout = []
903        self._stderr = []
904        self.status = None
905        self.condition = 'no_result'
906        self.workdir_set(workdir)
907        self.subdir(subdir)
908
909    def __del__(self):
910        self.cleanup()
911
912    def __repr__(self):
913        return "%x" % id(self)
914
915    banner_char = '='
916    banner_width = 80
917
918    def banner(self, s, width=None):
919        if width is None:
920            width = self.banner_width
921        return s + self.banner_char * (width - len(s))
922
923    if os.name == 'posix':
924
925        def escape(self, arg):
926            "escape shell special characters"
927            slash = '\\'
928            special = '"$'
929
930            arg = string.replace(arg, slash, slash+slash)
931            for c in special:
932                arg = string.replace(arg, c, slash+c)
933
934            if re_space.search(arg):
935                arg = '"' + arg + '"'
936            return arg
937
938    else:
939
940        # Windows does not allow special characters in file names
941        # anyway, so no need for an escape function, we will just quote
942        # the arg.
943        def escape(self, arg):
944            if re_space.search(arg):
945                arg = '"' + arg + '"'
946            return arg
947
948    def canonicalize(self, path):
949        if is_List(path):
950            path = apply(os.path.join, tuple(path))
951        if not os.path.isabs(path):
952            path = os.path.join(self.workdir, path)
953        return path
954
955    def chmod(self, path, mode):
956        """Changes permissions on the specified file or directory
957        path name."""
958        path = self.canonicalize(path)
959        os.chmod(path, mode)
960
961    def cleanup(self, condition = None):
962        """Removes any temporary working directories for the specified
963        TestCmd environment.  If the environment variable PRESERVE was
964        set when the TestCmd environment was created, temporary working
965        directories are not removed.  If any of the environment variables
966        PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
967        when the TestCmd environment was created, then temporary working
968        directories are not removed if the test passed, failed, or had
969        no result, respectively.  Temporary working directories are also
970        preserved for conditions specified via the preserve method.
971
972        Typically, this method is not called directly, but is used when
973        the script exits to clean up temporary working directories as
974        appropriate for the exit status.
975        """
976        if not self._dirlist:
977            return
978        os.chdir(self._cwd)
979        self.workdir = None
980        if condition is None:
981            condition = self.condition
982        if self._preserve[condition]:
983            for dir in self._dirlist:
984                print "Preserved directory", dir
985        else:
986            list = self._dirlist[:]
987            list.reverse()
988            for dir in list:
989                self.writable(dir, 1)
990                shutil.rmtree(dir, ignore_errors = 1)
991            self._dirlist = []
992
993        try:
994            global _Cleanup
995            _Cleanup.remove(self)
996        except (AttributeError, ValueError):
997            pass
998
999    def command_args(self, program = None,
1000                           interpreter = None,
1001                           arguments = None):
1002        if program:
1003            if type(program) == type('') and not os.path.isabs(program):
1004                program = os.path.join(self._cwd, program)
1005        else:
1006            program = self.program
1007            if not interpreter:
1008                interpreter = self.interpreter
1009        if not type(program) in [type([]), type(())]:
1010            program = [program]
1011        cmd = list(program)
1012        if interpreter:
1013            if not type(interpreter) in [type([]), type(())]:
1014                interpreter = [interpreter]
1015            cmd = list(interpreter) + cmd
1016        if arguments:
1017            if type(arguments) == type(''):
1018                arguments = string.split(arguments)
1019            cmd.extend(arguments)
1020        return cmd
1021
1022    def description_set(self, description):
1023        """Set the description of the functionality being tested.
1024        """
1025        self.description = description
1026
1027    try:
1028        difflib
1029    except NameError:
1030        def diff(self, a, b, name, *args, **kw):
1031            print self.banner('Expected %s' % name)
1032            print a
1033            print self.banner('Actual %s' % name)
1034            print b
1035    else:
1036        def diff(self, a, b, name, *args, **kw):
1037            print self.banner(name)
1038            args = (a.splitlines(), b.splitlines()) + args
1039            lines = apply(self.diff_function, args, kw)
1040            for l in lines:
1041                print l
1042
1043    def fail_test(self, condition = 1, function = None, skip = 0):
1044        """Cause the test to fail.
1045        """
1046        if not condition:
1047            return
1048        self.condition = 'fail_test'
1049        fail_test(self = self,
1050                  condition = condition,
1051                  function = function,
1052                  skip = skip)
1053
1054    def interpreter_set(self, interpreter):
1055        """Set the program to be used to interpret the program
1056        under test as a script.
1057        """
1058        self.interpreter = interpreter
1059
1060    def match(self, lines, matches):
1061        """Compare actual and expected file contents.
1062        """
1063        return self.match_function(lines, matches)
1064
1065    def match_exact(self, lines, matches):
1066        """Compare actual and expected file contents.
1067        """
1068        return match_exact(lines, matches)
1069
1070    def match_re(self, lines, res):
1071        """Compare actual and expected file contents.
1072        """
1073        return match_re(lines, res)
1074
1075    def match_re_dotall(self, lines, res):
1076        """Compare actual and expected file contents.
1077        """
1078        return match_re_dotall(lines, res)
1079
1080    def no_result(self, condition = 1, function = None, skip = 0):
1081        """Report that the test could not be run.
1082        """
1083        if not condition:
1084            return
1085        self.condition = 'no_result'
1086        no_result(self = self,
1087                  condition = condition,
1088                  function = function,
1089                  skip = skip)
1090
1091    def pass_test(self, condition = 1, function = None):
1092        """Cause the test to pass.
1093        """
1094        if not condition:
1095            return
1096        self.condition = 'pass_test'
1097        pass_test(self = self, condition = condition, function = function)
1098
1099    def preserve(self, *conditions):
1100        """Arrange for the temporary working directories for the
1101        specified TestCmd environment to be preserved for one or more
1102        conditions.  If no conditions are specified, arranges for
1103        the temporary working directories to be preserved for all
1104        conditions.
1105        """
1106        if conditions is ():
1107            conditions = ('pass_test', 'fail_test', 'no_result')
1108        for cond in conditions:
1109            self._preserve[cond] = 1
1110
1111    def program_set(self, program):
1112        """Set the executable program or script to be tested.
1113        """
1114        if program and not os.path.isabs(program):
1115            program = os.path.join(self._cwd, program)
1116        self.program = program
1117
1118    def read(self, file, mode = 'rb'):
1119        """Reads and returns the contents of the specified file name.
1120        The file name may be a list, in which case the elements are
1121        concatenated with the os.path.join() method.  The file is
1122        assumed to be under the temporary working directory unless it
1123        is an absolute path name.  The I/O mode for the file may
1124        be specified; it must begin with an 'r'.  The default is
1125        'rb' (binary read).
1126        """
1127        file = self.canonicalize(file)
1128        if mode[0] != 'r':
1129            raise ValueError, "mode must begin with 'r'"
1130        with open(file, mode) as f:
1131            result = f.read()
1132        return result
1133
1134    def rmdir(self, dir):
1135        """Removes the specified dir name.
1136        The dir name may be a list, in which case the elements are
1137        concatenated with the os.path.join() method.  The dir is
1138        assumed to be under the temporary working directory unless it
1139        is an absolute path name.
1140        The dir must be empty.
1141        """
1142        dir = self.canonicalize(dir)
1143        os.rmdir(dir)
1144
1145    def start(self, program = None,
1146                    interpreter = None,
1147                    arguments = None,
1148                    universal_newlines = None,
1149                    **kw):
1150        """
1151        Starts a program or script for the test environment.
1152
1153        The specified program will have the original directory
1154        prepended unless it is enclosed in a [list].
1155        """
1156        cmd = self.command_args(program, interpreter, arguments)
1157        cmd_string = string.join(map(self.escape, cmd), ' ')
1158        if self.verbose:
1159            sys.stderr.write(cmd_string + "\n")
1160        if universal_newlines is None:
1161            universal_newlines = self.universal_newlines
1162
1163        # On Windows, if we make stdin a pipe when we plan to send
1164        # no input, and the test program exits before
1165        # Popen calls msvcrt.open_osfhandle, that call will fail.
1166        # So don't use a pipe for stdin if we don't need one.
1167        stdin = kw.get('stdin', None)
1168        if stdin is not None:
1169            stdin = subprocess.PIPE
1170
1171        combine = kw.get('combine', self.combine)
1172        if combine:
1173            stderr_value = subprocess.STDOUT
1174        else:
1175            stderr_value = subprocess.PIPE
1176
1177        return Popen(cmd,
1178                     stdin=stdin,
1179                     stdout=subprocess.PIPE,
1180                     stderr=stderr_value,
1181                     universal_newlines=universal_newlines)
1182
1183    def finish(self, popen, **kw):
1184        """
1185        Finishes and waits for the process being run under control of
1186        the specified popen argument, recording the exit status,
1187        standard output and error output.
1188        """
1189        popen.stdin.close()
1190        self.status = popen.wait()
1191        if not self.status:
1192            self.status = 0
1193        self._stdout.append(popen.stdout.read())
1194        if popen.stderr:
1195            stderr = popen.stderr.read()
1196        else:
1197            stderr = ''
1198        self._stderr.append(stderr)
1199
1200    def run(self, program = None,
1201                  interpreter = None,
1202                  arguments = None,
1203                  chdir = None,
1204                  stdin = None,
1205                  universal_newlines = None):
1206        """Runs a test of the program or script for the test
1207        environment.  Standard output and error output are saved for
1208        future retrieval via the stdout() and stderr() methods.
1209
1210        The specified program will have the original directory
1211        prepended unless it is enclosed in a [list].
1212        """
1213        if chdir:
1214            oldcwd = os.getcwd()
1215            if not os.path.isabs(chdir):
1216                chdir = os.path.join(self.workpath(chdir))
1217            if self.verbose:
1218                sys.stderr.write("chdir(" + chdir + ")\n")
1219            os.chdir(chdir)
1220        p = self.start(program,
1221                       interpreter,
1222                       arguments,
1223                       universal_newlines,
1224                       stdin=stdin)
1225        if stdin:
1226            if is_List(stdin):
1227                for line in stdin:
1228                    p.stdin.write(line)
1229            else:
1230                p.stdin.write(stdin)
1231            p.stdin.close()
1232
1233        out = p.stdout.read()
1234        if p.stderr is None:
1235            err = ''
1236        else:
1237            err = p.stderr.read()
1238        try:
1239            close_output = p.close_output
1240        except AttributeError:
1241            p.stdout.close()
1242            if not p.stderr is None:
1243                p.stderr.close()
1244        else:
1245            close_output()
1246
1247        self._stdout.append(out)
1248        self._stderr.append(err)
1249
1250        self.status = p.wait()
1251        if not self.status:
1252            self.status = 0
1253
1254        if chdir:
1255            os.chdir(oldcwd)
1256        if self.verbose >= 2:
1257            write = sys.stdout.write
1258            write('============ STATUS: %d\n' % self.status)
1259            out = self.stdout()
1260            if out or self.verbose >= 3:
1261                write('============ BEGIN STDOUT (len=%d):\n' % len(out))
1262                write(out)
1263                write('============ END STDOUT\n')
1264            err = self.stderr()
1265            if err or self.verbose >= 3:
1266                write('============ BEGIN STDERR (len=%d)\n' % len(err))
1267                write(err)
1268                write('============ END STDERR\n')
1269
1270    def sleep(self, seconds = default_sleep_seconds):
1271        """Sleeps at least the specified number of seconds.  If no
1272        number is specified, sleeps at least the minimum number of
1273        seconds necessary to advance file time stamps on the current
1274        system.  Sleeping more seconds is all right.
1275        """
1276        time.sleep(seconds)
1277
1278    def stderr(self, run = None):
1279        """Returns the error output from the specified run number.
1280        If there is no specified run number, then returns the error
1281        output of the last run.  If the run number is less than zero,
1282        then returns the error output from that many runs back from the
1283        current run.
1284        """
1285        if not run:
1286            run = len(self._stderr)
1287        elif run < 0:
1288            run = len(self._stderr) + run
1289        run = run - 1
1290        return self._stderr[run]
1291
1292    def stdout(self, run = None):
1293        """Returns the standard output from the specified run number.
1294        If there is no specified run number, then returns the standard
1295        output of the last run.  If the run number is less than zero,
1296        then returns the standard output from that many runs back from
1297        the current run.
1298        """
1299        if not run:
1300            run = len(self._stdout)
1301        elif run < 0:
1302            run = len(self._stdout) + run
1303        run = run - 1
1304        return self._stdout[run]
1305
1306    def subdir(self, *subdirs):
1307        """Create new subdirectories under the temporary working
1308        directory, one for each argument.  An argument may be a list,
1309        in which case the list elements are concatenated using the
1310        os.path.join() method.  Subdirectories multiple levels deep
1311        must be created using a separate argument for each level:
1312
1313                test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
1314
1315        Returns the number of subdirectories actually created.
1316        """
1317        count = 0
1318        for sub in subdirs:
1319            if sub is None:
1320                continue
1321            if is_List(sub):
1322                sub = apply(os.path.join, tuple(sub))
1323            new = os.path.join(self.workdir, sub)
1324            try:
1325                os.mkdir(new)
1326            except OSError:
1327                pass
1328            else:
1329                count = count + 1
1330        return count
1331
1332    def symlink(self, target, link):
1333        """Creates a symlink to the specified target.
1334        The link name may be a list, in which case the elements are
1335        concatenated with the os.path.join() method.  The link is
1336        assumed to be under the temporary working directory unless it
1337        is an absolute path name. The target is *not* assumed to be
1338        under the temporary working directory.
1339        """
1340        link = self.canonicalize(link)
1341        os.symlink(target, link)
1342
1343    def tempdir(self, path=None):
1344        """Creates a temporary directory.
1345        A unique directory name is generated if no path name is specified.
1346        The directory is created, and will be removed when the TestCmd
1347        object is destroyed.
1348        """
1349        if path is None:
1350            try:
1351                path = tempfile.mktemp(prefix=tempfile.template)
1352            except TypeError:
1353                path = tempfile.mktemp()
1354        os.mkdir(path)
1355
1356        # Symlinks in the path will report things
1357        # differently from os.getcwd(), so chdir there
1358        # and back to fetch the canonical path.
1359        cwd = os.getcwd()
1360        try:
1361            os.chdir(path)
1362            path = os.getcwd()
1363        finally:
1364            os.chdir(cwd)
1365
1366        # Uppercase the drive letter since the case of drive
1367        # letters is pretty much random on win32:
1368        drive,rest = os.path.splitdrive(path)
1369        if drive:
1370            path = string.upper(drive) + rest
1371
1372        #
1373        self._dirlist.append(path)
1374        global _Cleanup
1375        try:
1376            _Cleanup.index(self)
1377        except ValueError:
1378            _Cleanup.append(self)
1379
1380        return path
1381
1382    def touch(self, path, mtime=None):
1383        """Updates the modification time on the specified file or
1384        directory path name.  The default is to update to the
1385        current time if no explicit modification time is specified.
1386        """
1387        path = self.canonicalize(path)
1388        atime = os.path.getatime(path)
1389        if mtime is None:
1390            mtime = time.time()
1391        os.utime(path, (atime, mtime))
1392
1393    def unlink(self, file):
1394        """Unlinks the specified file name.
1395        The file name may be a list, in which case the elements are
1396        concatenated with the os.path.join() method.  The file is
1397        assumed to be under the temporary working directory unless it
1398        is an absolute path name.
1399        """
1400        file = self.canonicalize(file)
1401        os.unlink(file)
1402
1403    def verbose_set(self, verbose):
1404        """Set the verbose level.
1405        """
1406        self.verbose = verbose
1407
1408    def where_is(self, file, path=None, pathext=None):
1409        """Find an executable file.
1410        """
1411        if is_List(file):
1412            file = apply(os.path.join, tuple(file))
1413        if not os.path.isabs(file):
1414            file = where_is(file, path, pathext)
1415        return file
1416
1417    def workdir_set(self, path):
1418        """Creates a temporary working directory with the specified
1419        path name.  If the path is a null string (''), a unique
1420        directory name is created.
1421        """
1422        if (path != None):
1423            if path == '':
1424                path = None
1425            path = self.tempdir(path)
1426        self.workdir = path
1427
1428    def workpath(self, *args):
1429        """Returns the absolute path name to a subdirectory or file
1430        within the current temporary working directory.  Concatenates
1431        the temporary working directory name with the specified
1432        arguments using the os.path.join() method.
1433        """
1434        return apply(os.path.join, (self.workdir,) + tuple(args))
1435
1436    def readable(self, top, read=1):
1437        """Make the specified directory tree readable (read == 1)
1438        or not (read == None).
1439
1440        This method has no effect on Windows systems, which use a
1441        completely different mechanism to control file readability.
1442        """
1443
1444        if sys.platform == 'win32':
1445            return
1446
1447        if read:
1448            def do_chmod(fname):
1449                try: st = os.stat(fname)
1450                except OSError: pass
1451                else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD))
1452        else:
1453            def do_chmod(fname):
1454                try: st = os.stat(fname)
1455                except OSError: pass
1456                else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD))
1457
1458        if os.path.isfile(top):
1459            # If it's a file, that's easy, just chmod it.
1460            do_chmod(top)
1461        elif read:
1462            # It's a directory and we're trying to turn on read
1463            # permission, so it's also pretty easy, just chmod the
1464            # directory and then chmod every entry on our walk down the
1465            # tree.  Because os.path.walk() is top-down, we'll enable
1466            # read permission on any directories that have it disabled
1467            # before os.path.walk() tries to list their contents.
1468            do_chmod(top)
1469
1470            def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1471                for n in names:
1472                    do_chmod(os.path.join(dirname, n))
1473
1474            os.path.walk(top, chmod_entries, None)
1475        else:
1476            # It's a directory and we're trying to turn off read
1477            # permission, which means we have to chmod the directoreis
1478            # in the tree bottom-up, lest disabling read permission from
1479            # the top down get in the way of being able to get at lower
1480            # parts of the tree.  But os.path.walk() visits things top
1481            # down, so we just use an object to collect a list of all
1482            # of the entries in the tree, reverse the list, and then
1483            # chmod the reversed (bottom-up) list.
1484            col = Collector(top)
1485            os.path.walk(top, col, None)
1486            col.entries.reverse()
1487            for d in col.entries: do_chmod(d)
1488
1489    def writable(self, top, write=1):
1490        """Make the specified directory tree writable (write == 1)
1491        or not (write == None).
1492        """
1493
1494        if sys.platform == 'win32':
1495
1496            if write:
1497                def do_chmod(fname):
1498                    try: os.chmod(fname, stat.S_IWRITE)
1499                    except OSError: pass
1500            else:
1501                def do_chmod(fname):
1502                    try: os.chmod(fname, stat.S_IREAD)
1503                    except OSError: pass
1504
1505        else:
1506
1507            if write:
1508                def do_chmod(fname):
1509                    try: st = os.stat(fname)
1510                    except OSError: pass
1511                    else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
1512            else:
1513                def do_chmod(fname):
1514                    try: st = os.stat(fname)
1515                    except OSError: pass
1516                    else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
1517
1518        if os.path.isfile(top):
1519            do_chmod(top)
1520        else:
1521            col = Collector(top)
1522            os.path.walk(top, col, None)
1523            for d in col.entries: do_chmod(d)
1524
1525    def executable(self, top, execute=1):
1526        """Make the specified directory tree executable (execute == 1)
1527        or not (execute == None).
1528
1529        This method has no effect on Windows systems, which use a
1530        completely different mechanism to control file executability.
1531        """
1532
1533        if sys.platform == 'win32':
1534            return
1535
1536        if execute:
1537            def do_chmod(fname):
1538                try: st = os.stat(fname)
1539                except OSError: pass
1540                else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC))
1541        else:
1542            def do_chmod(fname):
1543                try: st = os.stat(fname)
1544                except OSError: pass
1545                else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC))
1546
1547        if os.path.isfile(top):
1548            # If it's a file, that's easy, just chmod it.
1549            do_chmod(top)
1550        elif execute:
1551            # It's a directory and we're trying to turn on execute
1552            # permission, so it's also pretty easy, just chmod the
1553            # directory and then chmod every entry on our walk down the
1554            # tree.  Because os.path.walk() is top-down, we'll enable
1555            # execute permission on any directories that have it disabled
1556            # before os.path.walk() tries to list their contents.
1557            do_chmod(top)
1558
1559            def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1560                for n in names:
1561                    do_chmod(os.path.join(dirname, n))
1562
1563            os.path.walk(top, chmod_entries, None)
1564        else:
1565            # It's a directory and we're trying to turn off execute
1566            # permission, which means we have to chmod the directories
1567            # in the tree bottom-up, lest disabling execute permission from
1568            # the top down get in the way of being able to get at lower
1569            # parts of the tree.  But os.path.walk() visits things top
1570            # down, so we just use an object to collect a list of all
1571            # of the entries in the tree, reverse the list, and then
1572            # chmod the reversed (bottom-up) list.
1573            col = Collector(top)
1574            os.path.walk(top, col, None)
1575            col.entries.reverse()
1576            for d in col.entries: do_chmod(d)
1577
1578    def write(self, file, content, mode = 'wb'):
1579        """Writes the specified content text (second argument) to the
1580        specified file name (first argument).  The file name may be
1581        a list, in which case the elements are concatenated with the
1582        os.path.join() method.  The file is created under the temporary
1583        working directory.  Any subdirectories in the path must already
1584        exist.  The I/O mode for the file may be specified; it must
1585        begin with a 'w'.  The default is 'wb' (binary write).
1586        """
1587        file = self.canonicalize(file)
1588        if mode[0] != 'w':
1589            raise ValueError, "mode must begin with 'w'"
1590        with open(file, mode) as f:
1591            f.write(content)
1592
1593# Local Variables:
1594# tab-width:4
1595# indent-tabs-mode:nil
1596# End:
1597# vim: set expandtab tabstop=4 shiftwidth=4:
1598