• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# tempfile.py unit tests.
2import tempfile
3import errno
4import io
5import os
6import pathlib
7import sys
8import re
9import warnings
10import contextlib
11import stat
12import types
13import weakref
14import subprocess
15from unittest import mock
16
17import unittest
18from test import support
19from test.support import os_helper
20from test.support import script_helper
21from test.support import warnings_helper
22
23
24has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
25has_spawnl = hasattr(os, 'spawnl')
26
27# TEST_FILES may need to be tweaked for systems depending on the maximum
28# number of files that can be opened at one time (see ulimit -n)
29if sys.platform.startswith('openbsd'):
30    TEST_FILES = 48
31else:
32    TEST_FILES = 100
33
34# This is organized as one test for each chunk of code in tempfile.py,
35# in order of their appearance in the file.  Testing which requires
36# threads is not done here.
37
38class TestLowLevelInternals(unittest.TestCase):
39    def test_infer_return_type_singles(self):
40        self.assertIs(str, tempfile._infer_return_type(''))
41        self.assertIs(bytes, tempfile._infer_return_type(b''))
42        self.assertIs(str, tempfile._infer_return_type(None))
43
44    def test_infer_return_type_multiples(self):
45        self.assertIs(str, tempfile._infer_return_type('', ''))
46        self.assertIs(bytes, tempfile._infer_return_type(b'', b''))
47        with self.assertRaises(TypeError):
48            tempfile._infer_return_type('', b'')
49        with self.assertRaises(TypeError):
50            tempfile._infer_return_type(b'', '')
51
52    def test_infer_return_type_multiples_and_none(self):
53        self.assertIs(str, tempfile._infer_return_type(None, ''))
54        self.assertIs(str, tempfile._infer_return_type('', None))
55        self.assertIs(str, tempfile._infer_return_type(None, None))
56        self.assertIs(bytes, tempfile._infer_return_type(b'', None))
57        self.assertIs(bytes, tempfile._infer_return_type(None, b''))
58        with self.assertRaises(TypeError):
59            tempfile._infer_return_type('', None, b'')
60        with self.assertRaises(TypeError):
61            tempfile._infer_return_type(b'', None, '')
62
63    def test_infer_return_type_pathlib(self):
64        self.assertIs(str, tempfile._infer_return_type(pathlib.Path('/')))
65
66    def test_infer_return_type_pathlike(self):
67        class Path:
68            def __init__(self, path):
69                self.path = path
70
71            def __fspath__(self):
72                return self.path
73
74        self.assertIs(str, tempfile._infer_return_type(Path('/')))
75        self.assertIs(bytes, tempfile._infer_return_type(Path(b'/')))
76        self.assertIs(str, tempfile._infer_return_type('', Path('')))
77        self.assertIs(bytes, tempfile._infer_return_type(b'', Path(b'')))
78        self.assertIs(bytes, tempfile._infer_return_type(None, Path(b'')))
79        self.assertIs(str, tempfile._infer_return_type(None, Path('')))
80
81        with self.assertRaises(TypeError):
82            tempfile._infer_return_type('', Path(b''))
83        with self.assertRaises(TypeError):
84            tempfile._infer_return_type(b'', Path(''))
85
86# Common functionality.
87
88class BaseTestCase(unittest.TestCase):
89
90    str_check = re.compile(r"^[a-z0-9_-]{8}$")
91    b_check = re.compile(br"^[a-z0-9_-]{8}$")
92
93    def setUp(self):
94        self.enterContext(warnings_helper.check_warnings())
95        warnings.filterwarnings("ignore", category=RuntimeWarning,
96                                message="mktemp", module=__name__)
97
98    def nameCheck(self, name, dir, pre, suf):
99        (ndir, nbase) = os.path.split(name)
100        npre  = nbase[:len(pre)]
101        nsuf  = nbase[len(nbase)-len(suf):]
102
103        if dir is not None:
104            self.assertIs(
105                type(name),
106                str
107                if type(dir) is str or isinstance(dir, os.PathLike) else
108                bytes,
109                "unexpected return type",
110            )
111        if pre is not None:
112            self.assertIs(type(name), str if type(pre) is str else bytes,
113                          "unexpected return type")
114        if suf is not None:
115            self.assertIs(type(name), str if type(suf) is str else bytes,
116                          "unexpected return type")
117        if (dir, pre, suf) == (None, None, None):
118            self.assertIs(type(name), str, "default return type must be str")
119
120        # check for equality of the absolute paths!
121        self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir),
122                         "file %r not in directory %r" % (name, dir))
123        self.assertEqual(npre, pre,
124                         "file %r does not begin with %r" % (nbase, pre))
125        self.assertEqual(nsuf, suf,
126                         "file %r does not end with %r" % (nbase, suf))
127
128        nbase = nbase[len(pre):len(nbase)-len(suf)]
129        check = self.str_check if isinstance(nbase, str) else self.b_check
130        self.assertTrue(check.match(nbase),
131                        "random characters %r do not match %r"
132                        % (nbase, check.pattern))
133
134
135class TestExports(BaseTestCase):
136    def test_exports(self):
137        # There are no surprising symbols in the tempfile module
138        dict = tempfile.__dict__
139
140        expected = {
141            "NamedTemporaryFile" : 1,
142            "TemporaryFile" : 1,
143            "mkstemp" : 1,
144            "mkdtemp" : 1,
145            "mktemp" : 1,
146            "TMP_MAX" : 1,
147            "gettempprefix" : 1,
148            "gettempprefixb" : 1,
149            "gettempdir" : 1,
150            "gettempdirb" : 1,
151            "tempdir" : 1,
152            "template" : 1,
153            "SpooledTemporaryFile" : 1,
154            "TemporaryDirectory" : 1,
155        }
156
157        unexp = []
158        for key in dict:
159            if key[0] != '_' and key not in expected:
160                unexp.append(key)
161        self.assertTrue(len(unexp) == 0,
162                        "unexpected keys: %s" % unexp)
163
164
165class TestRandomNameSequence(BaseTestCase):
166    """Test the internal iterator object _RandomNameSequence."""
167
168    def setUp(self):
169        self.r = tempfile._RandomNameSequence()
170        super().setUp()
171
172    def test_get_eight_char_str(self):
173        # _RandomNameSequence returns a eight-character string
174        s = next(self.r)
175        self.nameCheck(s, '', '', '')
176
177    def test_many(self):
178        # _RandomNameSequence returns no duplicate strings (stochastic)
179
180        dict = {}
181        r = self.r
182        for i in range(TEST_FILES):
183            s = next(r)
184            self.nameCheck(s, '', '', '')
185            self.assertNotIn(s, dict)
186            dict[s] = 1
187
188    def supports_iter(self):
189        # _RandomNameSequence supports the iterator protocol
190
191        i = 0
192        r = self.r
193        for s in r:
194            i += 1
195            if i == 20:
196                break
197
198    @support.requires_fork()
199    def test_process_awareness(self):
200        # ensure that the random source differs between
201        # child and parent.
202        read_fd, write_fd = os.pipe()
203        pid = None
204        try:
205            pid = os.fork()
206            if not pid:
207                # child process
208                os.close(read_fd)
209                os.write(write_fd, next(self.r).encode("ascii"))
210                os.close(write_fd)
211                # bypass the normal exit handlers- leave those to
212                # the parent.
213                os._exit(0)
214
215            # parent process
216            parent_value = next(self.r)
217            child_value = os.read(read_fd, len(parent_value)).decode("ascii")
218        finally:
219            if pid:
220                support.wait_process(pid, exitcode=0)
221
222            os.close(read_fd)
223            os.close(write_fd)
224        self.assertNotEqual(child_value, parent_value)
225
226
227
228class TestCandidateTempdirList(BaseTestCase):
229    """Test the internal function _candidate_tempdir_list."""
230
231    def test_nonempty_list(self):
232        # _candidate_tempdir_list returns a nonempty list of strings
233
234        cand = tempfile._candidate_tempdir_list()
235
236        self.assertFalse(len(cand) == 0)
237        for c in cand:
238            self.assertIsInstance(c, str)
239
240    def test_wanted_dirs(self):
241        # _candidate_tempdir_list contains the expected directories
242
243        # Make sure the interesting environment variables are all set.
244        with os_helper.EnvironmentVarGuard() as env:
245            for envname in 'TMPDIR', 'TEMP', 'TMP':
246                dirname = os.getenv(envname)
247                if not dirname:
248                    env[envname] = os.path.abspath(envname)
249
250            cand = tempfile._candidate_tempdir_list()
251
252            for envname in 'TMPDIR', 'TEMP', 'TMP':
253                dirname = os.getenv(envname)
254                if not dirname: raise ValueError
255                self.assertIn(dirname, cand)
256
257            try:
258                dirname = os.getcwd()
259            except (AttributeError, OSError):
260                dirname = os.curdir
261
262            self.assertIn(dirname, cand)
263
264            # Not practical to try to verify the presence of OS-specific
265            # paths in this list.
266
267
268# We test _get_default_tempdir some more by testing gettempdir.
269
270class TestGetDefaultTempdir(BaseTestCase):
271    """Test _get_default_tempdir()."""
272
273    def test_no_files_left_behind(self):
274        # use a private empty directory
275        with tempfile.TemporaryDirectory() as our_temp_directory:
276            # force _get_default_tempdir() to consider our empty directory
277            def our_candidate_list():
278                return [our_temp_directory]
279
280            with support.swap_attr(tempfile, "_candidate_tempdir_list",
281                                   our_candidate_list):
282                # verify our directory is empty after _get_default_tempdir()
283                tempfile._get_default_tempdir()
284                self.assertEqual(os.listdir(our_temp_directory), [])
285
286                def raise_OSError(*args, **kwargs):
287                    raise OSError()
288
289                with support.swap_attr(os, "open", raise_OSError):
290                    # test again with failing os.open()
291                    with self.assertRaises(FileNotFoundError):
292                        tempfile._get_default_tempdir()
293                    self.assertEqual(os.listdir(our_temp_directory), [])
294
295                with support.swap_attr(os, "write", raise_OSError):
296                    # test again with failing os.write()
297                    with self.assertRaises(FileNotFoundError):
298                        tempfile._get_default_tempdir()
299                    self.assertEqual(os.listdir(our_temp_directory), [])
300
301
302class TestGetCandidateNames(BaseTestCase):
303    """Test the internal function _get_candidate_names."""
304
305    def test_retval(self):
306        # _get_candidate_names returns a _RandomNameSequence object
307        obj = tempfile._get_candidate_names()
308        self.assertIsInstance(obj, tempfile._RandomNameSequence)
309
310    def test_same_thing(self):
311        # _get_candidate_names always returns the same object
312        a = tempfile._get_candidate_names()
313        b = tempfile._get_candidate_names()
314
315        self.assertTrue(a is b)
316
317
318@contextlib.contextmanager
319def _inside_empty_temp_dir():
320    dir = tempfile.mkdtemp()
321    try:
322        with support.swap_attr(tempfile, 'tempdir', dir):
323            yield
324    finally:
325        os_helper.rmtree(dir)
326
327
328def _mock_candidate_names(*names):
329    return support.swap_attr(tempfile,
330                             '_get_candidate_names',
331                             lambda: iter(names))
332
333
334class TestBadTempdir:
335
336    @unittest.skipIf(
337        support.is_emscripten, "Emscripten cannot remove write bits."
338    )
339    def test_read_only_directory(self):
340        with _inside_empty_temp_dir():
341            oldmode = mode = os.stat(tempfile.tempdir).st_mode
342            mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
343            os.chmod(tempfile.tempdir, mode)
344            try:
345                if os.access(tempfile.tempdir, os.W_OK):
346                    self.skipTest("can't set the directory read-only")
347                with self.assertRaises(PermissionError):
348                    self.make_temp()
349                self.assertEqual(os.listdir(tempfile.tempdir), [])
350            finally:
351                os.chmod(tempfile.tempdir, oldmode)
352
353    def test_nonexisting_directory(self):
354        with _inside_empty_temp_dir():
355            tempdir = os.path.join(tempfile.tempdir, 'nonexistent')
356            with support.swap_attr(tempfile, 'tempdir', tempdir):
357                with self.assertRaises(FileNotFoundError):
358                    self.make_temp()
359
360    def test_non_directory(self):
361        with _inside_empty_temp_dir():
362            tempdir = os.path.join(tempfile.tempdir, 'file')
363            open(tempdir, 'wb').close()
364            with support.swap_attr(tempfile, 'tempdir', tempdir):
365                with self.assertRaises((NotADirectoryError, FileNotFoundError)):
366                    self.make_temp()
367
368
369class TestMkstempInner(TestBadTempdir, BaseTestCase):
370    """Test the internal function _mkstemp_inner."""
371
372    class mkstemped:
373        _bflags = tempfile._bin_openflags
374        _tflags = tempfile._text_openflags
375        _close = os.close
376        _unlink = os.unlink
377
378        def __init__(self, dir, pre, suf, bin):
379            if bin: flags = self._bflags
380            else:   flags = self._tflags
381
382            output_type = tempfile._infer_return_type(dir, pre, suf)
383            (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type)
384
385        def write(self, str):
386            os.write(self.fd, str)
387
388        def __del__(self):
389            self._close(self.fd)
390            self._unlink(self.name)
391
392    def do_create(self, dir=None, pre=None, suf=None, bin=1):
393        output_type = tempfile._infer_return_type(dir, pre, suf)
394        if dir is None:
395            if output_type is str:
396                dir = tempfile.gettempdir()
397            else:
398                dir = tempfile.gettempdirb()
399        if pre is None:
400            pre = output_type()
401        if suf is None:
402            suf = output_type()
403        file = self.mkstemped(dir, pre, suf, bin)
404
405        self.nameCheck(file.name, dir, pre, suf)
406        return file
407
408    def test_basic(self):
409        # _mkstemp_inner can create files
410        self.do_create().write(b"blat")
411        self.do_create(pre="a").write(b"blat")
412        self.do_create(suf="b").write(b"blat")
413        self.do_create(pre="a", suf="b").write(b"blat")
414        self.do_create(pre="aa", suf=".txt").write(b"blat")
415
416    def test_basic_with_bytes_names(self):
417        # _mkstemp_inner can create files when given name parts all
418        # specified as bytes.
419        dir_b = tempfile.gettempdirb()
420        self.do_create(dir=dir_b, suf=b"").write(b"blat")
421        self.do_create(dir=dir_b, pre=b"a").write(b"blat")
422        self.do_create(dir=dir_b, suf=b"b").write(b"blat")
423        self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat")
424        self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat")
425        # Can't mix str & binary types in the args.
426        with self.assertRaises(TypeError):
427            self.do_create(dir="", suf=b"").write(b"blat")
428        with self.assertRaises(TypeError):
429            self.do_create(dir=dir_b, pre="").write(b"blat")
430        with self.assertRaises(TypeError):
431            self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat")
432
433    def test_basic_many(self):
434        # _mkstemp_inner can create many files (stochastic)
435        extant = list(range(TEST_FILES))
436        for i in extant:
437            extant[i] = self.do_create(pre="aa")
438
439    def test_choose_directory(self):
440        # _mkstemp_inner can create files in a user-selected directory
441        dir = tempfile.mkdtemp()
442        try:
443            self.do_create(dir=dir).write(b"blat")
444            self.do_create(dir=pathlib.Path(dir)).write(b"blat")
445        finally:
446            support.gc_collect()  # For PyPy or other GCs.
447            os.rmdir(dir)
448
449    @os_helper.skip_unless_working_chmod
450    def test_file_mode(self):
451        # _mkstemp_inner creates files with the proper mode
452
453        file = self.do_create()
454        mode = stat.S_IMODE(os.stat(file.name).st_mode)
455        expected = 0o600
456        if sys.platform == 'win32':
457            # There's no distinction among 'user', 'group' and 'world';
458            # replicate the 'user' bits.
459            user = expected >> 6
460            expected = user * (1 + 8 + 64)
461        self.assertEqual(mode, expected)
462
463    @unittest.skipUnless(has_spawnl, 'os.spawnl not available')
464    @support.requires_subprocess()
465    def test_noinherit(self):
466        # _mkstemp_inner file handles are not inherited by child processes
467
468        if support.verbose:
469            v="v"
470        else:
471            v="q"
472
473        file = self.do_create()
474        self.assertEqual(os.get_inheritable(file.fd), False)
475        fd = "%d" % file.fd
476
477        try:
478            me = __file__
479        except NameError:
480            me = sys.argv[0]
481
482        # We have to exec something, so that FD_CLOEXEC will take
483        # effect.  The core of this test is therefore in
484        # tf_inherit_check.py, which see.
485        tester = os.path.join(os.path.dirname(os.path.abspath(me)),
486                              "tf_inherit_check.py")
487
488        # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
489        # but an arg with embedded spaces should be decorated with double
490        # quotes on each end
491        if sys.platform == 'win32':
492            decorated = '"%s"' % sys.executable
493            tester = '"%s"' % tester
494        else:
495            decorated = sys.executable
496
497        retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
498        self.assertFalse(retval < 0,
499                    "child process caught fatal signal %d" % -retval)
500        self.assertFalse(retval > 0, "child process reports failure %d"%retval)
501
502    @unittest.skipUnless(has_textmode, "text mode not available")
503    def test_textmode(self):
504        # _mkstemp_inner can create files in text mode
505
506        # A text file is truncated at the first Ctrl+Z byte
507        f = self.do_create(bin=0)
508        f.write(b"blat\x1a")
509        f.write(b"extra\n")
510        os.lseek(f.fd, 0, os.SEEK_SET)
511        self.assertEqual(os.read(f.fd, 20), b"blat")
512
513    def make_temp(self):
514        return tempfile._mkstemp_inner(tempfile.gettempdir(),
515                                       tempfile.gettempprefix(),
516                                       '',
517                                       tempfile._bin_openflags,
518                                       str)
519
520    def test_collision_with_existing_file(self):
521        # _mkstemp_inner tries another name when a file with
522        # the chosen name already exists
523        with _inside_empty_temp_dir(), \
524             _mock_candidate_names('aaa', 'aaa', 'bbb'):
525            (fd1, name1) = self.make_temp()
526            os.close(fd1)
527            self.assertTrue(name1.endswith('aaa'))
528
529            (fd2, name2) = self.make_temp()
530            os.close(fd2)
531            self.assertTrue(name2.endswith('bbb'))
532
533    def test_collision_with_existing_directory(self):
534        # _mkstemp_inner tries another name when a directory with
535        # the chosen name already exists
536        with _inside_empty_temp_dir(), \
537             _mock_candidate_names('aaa', 'aaa', 'bbb'):
538            dir = tempfile.mkdtemp()
539            self.assertTrue(dir.endswith('aaa'))
540
541            (fd, name) = self.make_temp()
542            os.close(fd)
543            self.assertTrue(name.endswith('bbb'))
544
545
546class TestGetTempPrefix(BaseTestCase):
547    """Test gettempprefix()."""
548
549    def test_sane_template(self):
550        # gettempprefix returns a nonempty prefix string
551        p = tempfile.gettempprefix()
552
553        self.assertIsInstance(p, str)
554        self.assertGreater(len(p), 0)
555
556        pb = tempfile.gettempprefixb()
557
558        self.assertIsInstance(pb, bytes)
559        self.assertGreater(len(pb), 0)
560
561    def test_usable_template(self):
562        # gettempprefix returns a usable prefix string
563
564        # Create a temp directory, avoiding use of the prefix.
565        # Then attempt to create a file whose name is
566        # prefix + 'xxxxxx.xxx' in that directory.
567        p = tempfile.gettempprefix() + "xxxxxx.xxx"
568        d = tempfile.mkdtemp(prefix="")
569        try:
570            p = os.path.join(d, p)
571            fd = os.open(p, os.O_RDWR | os.O_CREAT)
572            os.close(fd)
573            os.unlink(p)
574        finally:
575            os.rmdir(d)
576
577
578class TestGetTempDir(BaseTestCase):
579    """Test gettempdir()."""
580
581    def test_directory_exists(self):
582        # gettempdir returns a directory which exists
583
584        for d in (tempfile.gettempdir(), tempfile.gettempdirb()):
585            self.assertTrue(os.path.isabs(d) or d == os.curdir,
586                            "%r is not an absolute path" % d)
587            self.assertTrue(os.path.isdir(d),
588                            "%r is not a directory" % d)
589
590    def test_directory_writable(self):
591        # gettempdir returns a directory writable by the user
592
593        # sneaky: just instantiate a NamedTemporaryFile, which
594        # defaults to writing into the directory returned by
595        # gettempdir.
596        with tempfile.NamedTemporaryFile() as file:
597            file.write(b"blat")
598
599    def test_same_thing(self):
600        # gettempdir always returns the same object
601        a = tempfile.gettempdir()
602        b = tempfile.gettempdir()
603        c = tempfile.gettempdirb()
604
605        self.assertTrue(a is b)
606        self.assertNotEqual(type(a), type(c))
607        self.assertEqual(a, os.fsdecode(c))
608
609    def test_case_sensitive(self):
610        # gettempdir should not flatten its case
611        # even on a case-insensitive file system
612        case_sensitive_tempdir = tempfile.mkdtemp("-Temp")
613        _tempdir, tempfile.tempdir = tempfile.tempdir, None
614        try:
615            with os_helper.EnvironmentVarGuard() as env:
616                # Fake the first env var which is checked as a candidate
617                env["TMPDIR"] = case_sensitive_tempdir
618                self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir)
619        finally:
620            tempfile.tempdir = _tempdir
621            os_helper.rmdir(case_sensitive_tempdir)
622
623
624class TestMkstemp(BaseTestCase):
625    """Test mkstemp()."""
626
627    def do_create(self, dir=None, pre=None, suf=None):
628        output_type = tempfile._infer_return_type(dir, pre, suf)
629        if dir is None:
630            if output_type is str:
631                dir = tempfile.gettempdir()
632            else:
633                dir = tempfile.gettempdirb()
634        if pre is None:
635            pre = output_type()
636        if suf is None:
637            suf = output_type()
638        (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
639        (ndir, nbase) = os.path.split(name)
640        adir = os.path.abspath(dir)
641        self.assertEqual(adir, ndir,
642            "Directory '%s' incorrectly returned as '%s'" % (adir, ndir))
643
644        try:
645            self.nameCheck(name, dir, pre, suf)
646        finally:
647            os.close(fd)
648            os.unlink(name)
649
650    def test_basic(self):
651        # mkstemp can create files
652        self.do_create()
653        self.do_create(pre="a")
654        self.do_create(suf="b")
655        self.do_create(pre="a", suf="b")
656        self.do_create(pre="aa", suf=".txt")
657        self.do_create(dir=".")
658
659    def test_basic_with_bytes_names(self):
660        # mkstemp can create files when given name parts all
661        # specified as bytes.
662        d = tempfile.gettempdirb()
663        self.do_create(dir=d, suf=b"")
664        self.do_create(dir=d, pre=b"a")
665        self.do_create(dir=d, suf=b"b")
666        self.do_create(dir=d, pre=b"a", suf=b"b")
667        self.do_create(dir=d, pre=b"aa", suf=b".txt")
668        self.do_create(dir=b".")
669        with self.assertRaises(TypeError):
670            self.do_create(dir=".", pre=b"aa", suf=b".txt")
671        with self.assertRaises(TypeError):
672            self.do_create(dir=b".", pre="aa", suf=b".txt")
673        with self.assertRaises(TypeError):
674            self.do_create(dir=b".", pre=b"aa", suf=".txt")
675
676
677    def test_choose_directory(self):
678        # mkstemp can create directories in a user-selected directory
679        dir = tempfile.mkdtemp()
680        try:
681            self.do_create(dir=dir)
682            self.do_create(dir=pathlib.Path(dir))
683        finally:
684            os.rmdir(dir)
685
686    def test_for_tempdir_is_bytes_issue40701_api_warts(self):
687        orig_tempdir = tempfile.tempdir
688        self.assertIsInstance(tempfile.tempdir, (str, type(None)))
689        try:
690            fd, path = tempfile.mkstemp()
691            os.close(fd)
692            os.unlink(path)
693            self.assertIsInstance(path, str)
694            tempfile.tempdir = tempfile.gettempdirb()
695            self.assertIsInstance(tempfile.tempdir, bytes)
696            self.assertIsInstance(tempfile.gettempdir(), str)
697            self.assertIsInstance(tempfile.gettempdirb(), bytes)
698            fd, path = tempfile.mkstemp()
699            os.close(fd)
700            os.unlink(path)
701            self.assertIsInstance(path, bytes)
702            fd, path = tempfile.mkstemp(suffix='.txt')
703            os.close(fd)
704            os.unlink(path)
705            self.assertIsInstance(path, str)
706            fd, path = tempfile.mkstemp(prefix='test-temp-')
707            os.close(fd)
708            os.unlink(path)
709            self.assertIsInstance(path, str)
710            fd, path = tempfile.mkstemp(dir=tempfile.gettempdir())
711            os.close(fd)
712            os.unlink(path)
713            self.assertIsInstance(path, str)
714        finally:
715            tempfile.tempdir = orig_tempdir
716
717
718class TestMkdtemp(TestBadTempdir, BaseTestCase):
719    """Test mkdtemp()."""
720
721    def make_temp(self):
722        return tempfile.mkdtemp()
723
724    def do_create(self, dir=None, pre=None, suf=None):
725        output_type = tempfile._infer_return_type(dir, pre, suf)
726        if dir is None:
727            if output_type is str:
728                dir = tempfile.gettempdir()
729            else:
730                dir = tempfile.gettempdirb()
731        if pre is None:
732            pre = output_type()
733        if suf is None:
734            suf = output_type()
735        name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
736
737        try:
738            self.nameCheck(name, dir, pre, suf)
739            return name
740        except:
741            os.rmdir(name)
742            raise
743
744    def test_basic(self):
745        # mkdtemp can create directories
746        os.rmdir(self.do_create())
747        os.rmdir(self.do_create(pre="a"))
748        os.rmdir(self.do_create(suf="b"))
749        os.rmdir(self.do_create(pre="a", suf="b"))
750        os.rmdir(self.do_create(pre="aa", suf=".txt"))
751
752    def test_basic_with_bytes_names(self):
753        # mkdtemp can create directories when given all binary parts
754        d = tempfile.gettempdirb()
755        os.rmdir(self.do_create(dir=d))
756        os.rmdir(self.do_create(dir=d, pre=b"a"))
757        os.rmdir(self.do_create(dir=d, suf=b"b"))
758        os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b"))
759        os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt"))
760        with self.assertRaises(TypeError):
761            os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt"))
762        with self.assertRaises(TypeError):
763            os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt"))
764        with self.assertRaises(TypeError):
765            os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt"))
766
767    def test_basic_many(self):
768        # mkdtemp can create many directories (stochastic)
769        extant = list(range(TEST_FILES))
770        try:
771            for i in extant:
772                extant[i] = self.do_create(pre="aa")
773        finally:
774            for i in extant:
775                if(isinstance(i, str)):
776                    os.rmdir(i)
777
778    def test_choose_directory(self):
779        # mkdtemp can create directories in a user-selected directory
780        dir = tempfile.mkdtemp()
781        try:
782            os.rmdir(self.do_create(dir=dir))
783            os.rmdir(self.do_create(dir=pathlib.Path(dir)))
784        finally:
785            os.rmdir(dir)
786
787    @os_helper.skip_unless_working_chmod
788    def test_mode(self):
789        # mkdtemp creates directories with the proper mode
790
791        dir = self.do_create()
792        try:
793            mode = stat.S_IMODE(os.stat(dir).st_mode)
794            mode &= 0o777 # Mask off sticky bits inherited from /tmp
795            expected = 0o700
796            if sys.platform == 'win32':
797                # There's no distinction among 'user', 'group' and 'world';
798                # replicate the 'user' bits.
799                user = expected >> 6
800                expected = user * (1 + 8 + 64)
801            self.assertEqual(mode, expected)
802        finally:
803            os.rmdir(dir)
804
805    @unittest.skipUnless(os.name == "nt", "Only on Windows.")
806    def test_mode_win32(self):
807        # Use icacls.exe to extract the users with some level of access
808        # Main thing we are testing is that the BUILTIN\Users group has
809        # no access. The exact ACL is going to vary based on which user
810        # is running the test.
811        dir = self.do_create()
812        try:
813            out = subprocess.check_output(["icacls.exe", dir], encoding="oem").casefold()
814        finally:
815            os.rmdir(dir)
816
817        dir = dir.casefold()
818        users = set()
819        found_user = False
820        for line in out.strip().splitlines():
821            acl = None
822            # First line of result includes our directory
823            if line.startswith(dir):
824                acl = line.removeprefix(dir).strip()
825            elif line and line[:1].isspace():
826                acl = line.strip()
827            if acl:
828                users.add(acl.partition(":")[0])
829
830        self.assertNotIn(r"BUILTIN\Users".casefold(), users)
831
832    def test_collision_with_existing_file(self):
833        # mkdtemp tries another name when a file with
834        # the chosen name already exists
835        with _inside_empty_temp_dir(), \
836             _mock_candidate_names('aaa', 'aaa', 'bbb'):
837            file = tempfile.NamedTemporaryFile(delete=False)
838            file.close()
839            self.assertTrue(file.name.endswith('aaa'))
840            dir = tempfile.mkdtemp()
841            self.assertTrue(dir.endswith('bbb'))
842
843    def test_collision_with_existing_directory(self):
844        # mkdtemp tries another name when a directory with
845        # the chosen name already exists
846        with _inside_empty_temp_dir(), \
847             _mock_candidate_names('aaa', 'aaa', 'bbb'):
848            dir1 = tempfile.mkdtemp()
849            self.assertTrue(dir1.endswith('aaa'))
850            dir2 = tempfile.mkdtemp()
851            self.assertTrue(dir2.endswith('bbb'))
852
853    def test_for_tempdir_is_bytes_issue40701_api_warts(self):
854        orig_tempdir = tempfile.tempdir
855        self.assertIsInstance(tempfile.tempdir, (str, type(None)))
856        try:
857            path = tempfile.mkdtemp()
858            os.rmdir(path)
859            self.assertIsInstance(path, str)
860            tempfile.tempdir = tempfile.gettempdirb()
861            self.assertIsInstance(tempfile.tempdir, bytes)
862            self.assertIsInstance(tempfile.gettempdir(), str)
863            self.assertIsInstance(tempfile.gettempdirb(), bytes)
864            path = tempfile.mkdtemp()
865            os.rmdir(path)
866            self.assertIsInstance(path, bytes)
867            path = tempfile.mkdtemp(suffix='-dir')
868            os.rmdir(path)
869            self.assertIsInstance(path, str)
870            path = tempfile.mkdtemp(prefix='test-mkdtemp-')
871            os.rmdir(path)
872            self.assertIsInstance(path, str)
873            path = tempfile.mkdtemp(dir=tempfile.gettempdir())
874            os.rmdir(path)
875            self.assertIsInstance(path, str)
876        finally:
877            tempfile.tempdir = orig_tempdir
878
879
880class TestMktemp(BaseTestCase):
881    """Test mktemp()."""
882
883    # For safety, all use of mktemp must occur in a private directory.
884    # We must also suppress the RuntimeWarning it generates.
885    def setUp(self):
886        self.dir = tempfile.mkdtemp()
887        super().setUp()
888
889    def tearDown(self):
890        if self.dir:
891            os.rmdir(self.dir)
892            self.dir = None
893        super().tearDown()
894
895    class mktemped:
896        _unlink = os.unlink
897        _bflags = tempfile._bin_openflags
898
899        def __init__(self, dir, pre, suf):
900            self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
901            # Create the file.  This will raise an exception if it's
902            # mysteriously appeared in the meanwhile.
903            os.close(os.open(self.name, self._bflags, 0o600))
904
905        def __del__(self):
906            self._unlink(self.name)
907
908    def do_create(self, pre="", suf=""):
909        file = self.mktemped(self.dir, pre, suf)
910
911        self.nameCheck(file.name, self.dir, pre, suf)
912        return file
913
914    def test_basic(self):
915        # mktemp can choose usable file names
916        self.do_create()
917        self.do_create(pre="a")
918        self.do_create(suf="b")
919        self.do_create(pre="a", suf="b")
920        self.do_create(pre="aa", suf=".txt")
921
922    def test_many(self):
923        # mktemp can choose many usable file names (stochastic)
924        extant = list(range(TEST_FILES))
925        for i in extant:
926            extant[i] = self.do_create(pre="aa")
927        del extant
928        support.gc_collect()  # For PyPy or other GCs.
929
930##     def test_warning(self):
931##         # mktemp issues a warning when used
932##         warnings.filterwarnings("error",
933##                                 category=RuntimeWarning,
934##                                 message="mktemp")
935##         self.assertRaises(RuntimeWarning,
936##                           tempfile.mktemp, dir=self.dir)
937
938
939# We test _TemporaryFileWrapper by testing NamedTemporaryFile.
940
941
942class TestNamedTemporaryFile(BaseTestCase):
943    """Test NamedTemporaryFile()."""
944
945    def do_create(self, dir=None, pre="", suf="", delete=True):
946        if dir is None:
947            dir = tempfile.gettempdir()
948        file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf,
949                                           delete=delete)
950
951        self.nameCheck(file.name, dir, pre, suf)
952        return file
953
954
955    def test_basic(self):
956        # NamedTemporaryFile can create files
957        self.do_create()
958        self.do_create(pre="a")
959        self.do_create(suf="b")
960        self.do_create(pre="a", suf="b")
961        self.do_create(pre="aa", suf=".txt")
962
963    def test_method_lookup(self):
964        # Issue #18879: Looking up a temporary file method should keep it
965        # alive long enough.
966        f = self.do_create()
967        wr = weakref.ref(f)
968        write = f.write
969        write2 = f.write
970        del f
971        write(b'foo')
972        del write
973        write2(b'bar')
974        del write2
975        if support.check_impl_detail(cpython=True):
976            # No reference cycle was created.
977            self.assertIsNone(wr())
978
979    def test_iter(self):
980        # Issue #23700: getting iterator from a temporary file should keep
981        # it alive as long as it's being iterated over
982        lines = [b'spam\n', b'eggs\n', b'beans\n']
983        def make_file():
984            f = tempfile.NamedTemporaryFile(mode='w+b')
985            f.write(b''.join(lines))
986            f.seek(0)
987            return f
988        for i, l in enumerate(make_file()):
989            self.assertEqual(l, lines[i])
990        self.assertEqual(i, len(lines) - 1)
991
992    def test_creates_named(self):
993        # NamedTemporaryFile creates files with names
994        f = tempfile.NamedTemporaryFile()
995        self.assertTrue(os.path.exists(f.name),
996                        "NamedTemporaryFile %s does not exist" % f.name)
997
998    def test_del_on_close(self):
999        # A NamedTemporaryFile is deleted when closed
1000        dir = tempfile.mkdtemp()
1001        try:
1002            with tempfile.NamedTemporaryFile(dir=dir) as f:
1003                f.write(b'blat')
1004            self.assertEqual(os.listdir(dir), [])
1005            self.assertFalse(os.path.exists(f.name),
1006                        "NamedTemporaryFile %s exists after close" % f.name)
1007        finally:
1008            os.rmdir(dir)
1009
1010    def test_dis_del_on_close(self):
1011        # Tests that delete-on-close can be disabled
1012        dir = tempfile.mkdtemp()
1013        tmp = None
1014        try:
1015            f = tempfile.NamedTemporaryFile(dir=dir, delete=False)
1016            tmp = f.name
1017            f.write(b'blat')
1018            f.close()
1019            self.assertTrue(os.path.exists(f.name),
1020                        "NamedTemporaryFile %s missing after close" % f.name)
1021        finally:
1022            if tmp is not None:
1023                os.unlink(tmp)
1024            os.rmdir(dir)
1025
1026    def test_multiple_close(self):
1027        # A NamedTemporaryFile can be closed many times without error
1028        f = tempfile.NamedTemporaryFile()
1029        f.write(b'abc\n')
1030        f.close()
1031        f.close()
1032        f.close()
1033
1034    def test_context_manager(self):
1035        # A NamedTemporaryFile can be used as a context manager
1036        with tempfile.NamedTemporaryFile() as f:
1037            self.assertTrue(os.path.exists(f.name))
1038        self.assertFalse(os.path.exists(f.name))
1039        def use_closed():
1040            with f:
1041                pass
1042        self.assertRaises(ValueError, use_closed)
1043
1044    def test_bad_mode(self):
1045        dir = tempfile.mkdtemp()
1046        self.addCleanup(os_helper.rmtree, dir)
1047        with self.assertRaises(ValueError):
1048            tempfile.NamedTemporaryFile(mode='wr', dir=dir)
1049        with self.assertRaises(TypeError):
1050            tempfile.NamedTemporaryFile(mode=2, dir=dir)
1051        self.assertEqual(os.listdir(dir), [])
1052
1053    def test_bad_encoding(self):
1054        dir = tempfile.mkdtemp()
1055        self.addCleanup(os_helper.rmtree, dir)
1056        with self.assertRaises(LookupError):
1057            tempfile.NamedTemporaryFile('w', encoding='bad-encoding', dir=dir)
1058        self.assertEqual(os.listdir(dir), [])
1059
1060    def test_unexpected_error(self):
1061        dir = tempfile.mkdtemp()
1062        self.addCleanup(os_helper.rmtree, dir)
1063        with mock.patch('tempfile._TemporaryFileWrapper') as mock_ntf, \
1064             mock.patch('io.open', mock.mock_open()) as mock_open:
1065            mock_ntf.side_effect = KeyboardInterrupt()
1066            with self.assertRaises(KeyboardInterrupt):
1067                tempfile.NamedTemporaryFile(dir=dir)
1068        mock_open().close.assert_called()
1069        self.assertEqual(os.listdir(dir), [])
1070
1071    # How to test the mode and bufsize parameters?
1072
1073class TestSpooledTemporaryFile(BaseTestCase):
1074    """Test SpooledTemporaryFile()."""
1075
1076    def do_create(self, max_size=0, dir=None, pre="", suf=""):
1077        if dir is None:
1078            dir = tempfile.gettempdir()
1079        file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
1080
1081        return file
1082
1083
1084    def test_basic(self):
1085        # SpooledTemporaryFile can create files
1086        f = self.do_create()
1087        self.assertFalse(f._rolled)
1088        f = self.do_create(max_size=100, pre="a", suf=".txt")
1089        self.assertFalse(f._rolled)
1090
1091    def test_is_iobase(self):
1092        # SpooledTemporaryFile should implement io.IOBase
1093        self.assertIsInstance(self.do_create(), io.IOBase)
1094
1095    def test_iobase_interface(self):
1096        # SpooledTemporaryFile should implement the io.IOBase interface.
1097        # Ensure it has all the required methods and properties.
1098        iobase_attrs = {
1099            # From IOBase
1100            'fileno', 'seek', 'truncate', 'close', 'closed', '__enter__',
1101            '__exit__', 'flush', 'isatty', '__iter__', '__next__', 'readable',
1102            'readline', 'readlines', 'seekable', 'tell', 'writable',
1103            'writelines',
1104            # From BufferedIOBase (binary mode) and TextIOBase (text mode)
1105            'detach', 'read', 'read1', 'write', 'readinto', 'readinto1',
1106            'encoding', 'errors', 'newlines',
1107        }
1108        spooledtempfile_attrs = set(dir(tempfile.SpooledTemporaryFile))
1109        missing_attrs = iobase_attrs - spooledtempfile_attrs
1110        self.assertFalse(
1111            missing_attrs,
1112            'SpooledTemporaryFile missing attributes from IOBase/BufferedIOBase/TextIOBase'
1113        )
1114
1115    def test_del_on_close(self):
1116        # A SpooledTemporaryFile is deleted when closed
1117        dir = tempfile.mkdtemp()
1118        try:
1119            f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
1120            self.assertFalse(f._rolled)
1121            f.write(b'blat ' * 5)
1122            self.assertTrue(f._rolled)
1123            filename = f.name
1124            f.close()
1125            self.assertEqual(os.listdir(dir), [])
1126            if not isinstance(filename, int):
1127                self.assertFalse(os.path.exists(filename),
1128                    "SpooledTemporaryFile %s exists after close" % filename)
1129        finally:
1130            os.rmdir(dir)
1131
1132    def test_del_unrolled_file(self):
1133        # The unrolled SpooledTemporaryFile should raise a ResourceWarning
1134        # when deleted since the file was not explicitly closed.
1135        f = self.do_create(max_size=10)
1136        f.write(b'foo')
1137        self.assertEqual(f.name, None)  # Unrolled so no filename/fd
1138        with self.assertWarns(ResourceWarning):
1139            f.__del__()
1140
1141    @unittest.skipIf(
1142        support.is_emscripten, "Emscripten cannot fstat renamed files."
1143    )
1144    def test_del_rolled_file(self):
1145        # The rolled file should be deleted when the SpooledTemporaryFile
1146        # object is deleted. This should raise a ResourceWarning since the file
1147        # was not explicitly closed.
1148        f = self.do_create(max_size=2)
1149        f.write(b'foo')
1150        name = f.name  # This is a fd on posix+cygwin, a filename everywhere else
1151        self.assertTrue(os.path.exists(name))
1152        with self.assertWarns(ResourceWarning):
1153            f.__del__()
1154        self.assertFalse(
1155            os.path.exists(name),
1156            "Rolled SpooledTemporaryFile (name=%s) exists after delete" % name
1157        )
1158
1159    def test_rewrite_small(self):
1160        # A SpooledTemporaryFile can be written to multiple within the max_size
1161        f = self.do_create(max_size=30)
1162        self.assertFalse(f._rolled)
1163        for i in range(5):
1164            f.seek(0, 0)
1165            f.write(b'x' * 20)
1166        self.assertFalse(f._rolled)
1167
1168    def test_write_sequential(self):
1169        # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1170        # over afterward
1171        f = self.do_create(max_size=30)
1172        self.assertFalse(f._rolled)
1173        f.write(b'x' * 20)
1174        self.assertFalse(f._rolled)
1175        f.write(b'x' * 10)
1176        self.assertFalse(f._rolled)
1177        f.write(b'x')
1178        self.assertTrue(f._rolled)
1179
1180    def test_writelines(self):
1181        # Verify writelines with a SpooledTemporaryFile
1182        f = self.do_create()
1183        f.writelines((b'x', b'y', b'z'))
1184        pos = f.seek(0)
1185        self.assertEqual(pos, 0)
1186        buf = f.read()
1187        self.assertEqual(buf, b'xyz')
1188
1189    def test_writelines_sequential(self):
1190        # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1191        # over afterward
1192        f = self.do_create(max_size=35)
1193        f.writelines((b'x' * 20, b'x' * 10, b'x' * 5))
1194        self.assertFalse(f._rolled)
1195        f.write(b'x')
1196        self.assertTrue(f._rolled)
1197
1198    def test_sparse(self):
1199        # A SpooledTemporaryFile that is written late in the file will extend
1200        # when that occurs
1201        f = self.do_create(max_size=30)
1202        self.assertFalse(f._rolled)
1203        pos = f.seek(100, 0)
1204        self.assertEqual(pos, 100)
1205        self.assertFalse(f._rolled)
1206        f.write(b'x')
1207        self.assertTrue(f._rolled)
1208
1209    def test_fileno(self):
1210        # A SpooledTemporaryFile should roll over to a real file on fileno()
1211        f = self.do_create(max_size=30)
1212        self.assertFalse(f._rolled)
1213        self.assertTrue(f.fileno() > 0)
1214        self.assertTrue(f._rolled)
1215
1216    def test_multiple_close_before_rollover(self):
1217        # A SpooledTemporaryFile can be closed many times without error
1218        f = tempfile.SpooledTemporaryFile()
1219        f.write(b'abc\n')
1220        self.assertFalse(f._rolled)
1221        f.close()
1222        f.close()
1223        f.close()
1224
1225    def test_multiple_close_after_rollover(self):
1226        # A SpooledTemporaryFile can be closed many times without error
1227        f = tempfile.SpooledTemporaryFile(max_size=1)
1228        f.write(b'abc\n')
1229        self.assertTrue(f._rolled)
1230        f.close()
1231        f.close()
1232        f.close()
1233
1234    def test_bound_methods(self):
1235        # It should be OK to steal a bound method from a SpooledTemporaryFile
1236        # and use it independently; when the file rolls over, those bound
1237        # methods should continue to function
1238        f = self.do_create(max_size=30)
1239        read = f.read
1240        write = f.write
1241        seek = f.seek
1242
1243        write(b"a" * 35)
1244        write(b"b" * 35)
1245        seek(0, 0)
1246        self.assertEqual(read(70), b'a'*35 + b'b'*35)
1247
1248    def test_properties(self):
1249        f = tempfile.SpooledTemporaryFile(max_size=10)
1250        f.write(b'x' * 10)
1251        self.assertFalse(f._rolled)
1252        self.assertEqual(f.mode, 'w+b')
1253        self.assertIsNone(f.name)
1254        with self.assertRaises(AttributeError):
1255            f.newlines
1256        with self.assertRaises(AttributeError):
1257            f.encoding
1258        with self.assertRaises(AttributeError):
1259            f.errors
1260
1261        f.write(b'x')
1262        self.assertTrue(f._rolled)
1263        self.assertEqual(f.mode, 'rb+')
1264        self.assertIsNotNone(f.name)
1265        with self.assertRaises(AttributeError):
1266            f.newlines
1267        with self.assertRaises(AttributeError):
1268            f.encoding
1269        with self.assertRaises(AttributeError):
1270            f.errors
1271
1272    def test_text_mode(self):
1273        # Creating a SpooledTemporaryFile with a text mode should produce
1274        # a file object reading and writing (Unicode) text strings.
1275        f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
1276                                          encoding="utf-8")
1277        f.write("abc\n")
1278        f.seek(0)
1279        self.assertEqual(f.read(), "abc\n")
1280        f.write("def\n")
1281        f.seek(0)
1282        self.assertEqual(f.read(), "abc\ndef\n")
1283        self.assertFalse(f._rolled)
1284        self.assertEqual(f.mode, 'w+')
1285        self.assertIsNone(f.name)
1286        self.assertEqual(f.newlines, os.linesep)
1287        self.assertEqual(f.encoding, "utf-8")
1288        self.assertEqual(f.errors, "strict")
1289
1290        f.write("xyzzy\n")
1291        f.seek(0)
1292        self.assertEqual(f.read(), "abc\ndef\nxyzzy\n")
1293        # Check that Ctrl+Z doesn't truncate the file
1294        f.write("foo\x1abar\n")
1295        f.seek(0)
1296        self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n")
1297        self.assertTrue(f._rolled)
1298        self.assertEqual(f.mode, 'w+')
1299        self.assertIsNotNone(f.name)
1300        self.assertEqual(f.newlines, os.linesep)
1301        self.assertEqual(f.encoding, "utf-8")
1302        self.assertEqual(f.errors, "strict")
1303
1304    def test_text_newline_and_encoding(self):
1305        f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
1306                                          newline='', encoding='utf-8',
1307                                          errors='ignore')
1308        f.write("\u039B\r\n")
1309        f.seek(0)
1310        self.assertEqual(f.read(), "\u039B\r\n")
1311        self.assertFalse(f._rolled)
1312        self.assertEqual(f.mode, 'w+')
1313        self.assertIsNone(f.name)
1314        self.assertIsNotNone(f.newlines)
1315        self.assertEqual(f.encoding, "utf-8")
1316        self.assertEqual(f.errors, "ignore")
1317
1318        f.write("\u039C" * 10 + "\r\n")
1319        f.write("\u039D" * 20)
1320        f.seek(0)
1321        self.assertEqual(f.read(),
1322                "\u039B\r\n" + ("\u039C" * 10) + "\r\n" + ("\u039D" * 20))
1323        self.assertTrue(f._rolled)
1324        self.assertEqual(f.mode, 'w+')
1325        self.assertIsNotNone(f.name)
1326        self.assertIsNotNone(f.newlines)
1327        self.assertEqual(f.encoding, 'utf-8')
1328        self.assertEqual(f.errors, 'ignore')
1329
1330    def test_context_manager_before_rollover(self):
1331        # A SpooledTemporaryFile can be used as a context manager
1332        with tempfile.SpooledTemporaryFile(max_size=1) as f:
1333            self.assertFalse(f._rolled)
1334            self.assertFalse(f.closed)
1335        self.assertTrue(f.closed)
1336        def use_closed():
1337            with f:
1338                pass
1339        self.assertRaises(ValueError, use_closed)
1340
1341    def test_context_manager_during_rollover(self):
1342        # A SpooledTemporaryFile can be used as a context manager
1343        with tempfile.SpooledTemporaryFile(max_size=1) as f:
1344            self.assertFalse(f._rolled)
1345            f.write(b'abc\n')
1346            f.flush()
1347            self.assertTrue(f._rolled)
1348            self.assertFalse(f.closed)
1349        self.assertTrue(f.closed)
1350        def use_closed():
1351            with f:
1352                pass
1353        self.assertRaises(ValueError, use_closed)
1354
1355    def test_context_manager_after_rollover(self):
1356        # A SpooledTemporaryFile can be used as a context manager
1357        f = tempfile.SpooledTemporaryFile(max_size=1)
1358        f.write(b'abc\n')
1359        f.flush()
1360        self.assertTrue(f._rolled)
1361        with f:
1362            self.assertFalse(f.closed)
1363        self.assertTrue(f.closed)
1364        def use_closed():
1365            with f:
1366                pass
1367        self.assertRaises(ValueError, use_closed)
1368
1369    @unittest.skipIf(
1370        support.is_emscripten, "Emscripten cannot fstat renamed files."
1371    )
1372    def test_truncate_with_size_parameter(self):
1373        # A SpooledTemporaryFile can be truncated to zero size
1374        f = tempfile.SpooledTemporaryFile(max_size=10)
1375        f.write(b'abcdefg\n')
1376        f.seek(0)
1377        f.truncate()
1378        self.assertFalse(f._rolled)
1379        self.assertEqual(f._file.getvalue(), b'')
1380        # A SpooledTemporaryFile can be truncated to a specific size
1381        f = tempfile.SpooledTemporaryFile(max_size=10)
1382        f.write(b'abcdefg\n')
1383        f.truncate(4)
1384        self.assertFalse(f._rolled)
1385        self.assertEqual(f._file.getvalue(), b'abcd')
1386        # A SpooledTemporaryFile rolls over if truncated to large size
1387        f = tempfile.SpooledTemporaryFile(max_size=10)
1388        f.write(b'abcdefg\n')
1389        f.truncate(20)
1390        self.assertTrue(f._rolled)
1391        self.assertEqual(os.fstat(f.fileno()).st_size, 20)
1392
1393    def test_class_getitem(self):
1394        self.assertIsInstance(tempfile.SpooledTemporaryFile[bytes],
1395                      types.GenericAlias)
1396
1397if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
1398
1399    class TestTemporaryFile(BaseTestCase):
1400        """Test TemporaryFile()."""
1401
1402        def test_basic(self):
1403            # TemporaryFile can create files
1404            # No point in testing the name params - the file has no name.
1405            tempfile.TemporaryFile()
1406
1407        def test_has_no_name(self):
1408            # TemporaryFile creates files with no names (on this system)
1409            dir = tempfile.mkdtemp()
1410            f = tempfile.TemporaryFile(dir=dir)
1411            f.write(b'blat')
1412
1413            # Sneaky: because this file has no name, it should not prevent
1414            # us from removing the directory it was created in.
1415            try:
1416                os.rmdir(dir)
1417            except:
1418                # cleanup
1419                f.close()
1420                os.rmdir(dir)
1421                raise
1422
1423        def test_multiple_close(self):
1424            # A TemporaryFile can be closed many times without error
1425            f = tempfile.TemporaryFile()
1426            f.write(b'abc\n')
1427            f.close()
1428            f.close()
1429            f.close()
1430
1431        # How to test the mode and bufsize parameters?
1432        def test_mode_and_encoding(self):
1433
1434            def roundtrip(input, *args, **kwargs):
1435                with tempfile.TemporaryFile(*args, **kwargs) as fileobj:
1436                    fileobj.write(input)
1437                    fileobj.seek(0)
1438                    self.assertEqual(input, fileobj.read())
1439
1440            roundtrip(b"1234", "w+b")
1441            roundtrip("abdc\n", "w+")
1442            roundtrip("\u039B", "w+", encoding="utf-16")
1443            roundtrip("foo\r\n", "w+", newline="")
1444
1445        def test_bad_mode(self):
1446            dir = tempfile.mkdtemp()
1447            self.addCleanup(os_helper.rmtree, dir)
1448            with self.assertRaises(ValueError):
1449                tempfile.TemporaryFile(mode='wr', dir=dir)
1450            with self.assertRaises(TypeError):
1451                tempfile.TemporaryFile(mode=2, dir=dir)
1452            self.assertEqual(os.listdir(dir), [])
1453
1454        def test_bad_encoding(self):
1455            dir = tempfile.mkdtemp()
1456            self.addCleanup(os_helper.rmtree, dir)
1457            with self.assertRaises(LookupError):
1458                tempfile.TemporaryFile('w', encoding='bad-encoding', dir=dir)
1459            self.assertEqual(os.listdir(dir), [])
1460
1461        def test_unexpected_error(self):
1462            dir = tempfile.mkdtemp()
1463            self.addCleanup(os_helper.rmtree, dir)
1464            with mock.patch('tempfile._O_TMPFILE_WORKS', False), \
1465                 mock.patch('os.unlink') as mock_unlink, \
1466                 mock.patch('os.open') as mock_open, \
1467                 mock.patch('os.close') as mock_close:
1468                mock_unlink.side_effect = KeyboardInterrupt()
1469                with self.assertRaises(KeyboardInterrupt):
1470                    tempfile.TemporaryFile(dir=dir)
1471            mock_close.assert_called()
1472            self.assertEqual(os.listdir(dir), [])
1473
1474
1475# Helper for test_del_on_shutdown
1476class NulledModules:
1477    def __init__(self, *modules):
1478        self.refs = [mod.__dict__ for mod in modules]
1479        self.contents = [ref.copy() for ref in self.refs]
1480
1481    def __enter__(self):
1482        for d in self.refs:
1483            for key in d:
1484                d[key] = None
1485
1486    def __exit__(self, *exc_info):
1487        for d, c in zip(self.refs, self.contents):
1488            d.clear()
1489            d.update(c)
1490
1491
1492class TestTemporaryDirectory(BaseTestCase):
1493    """Test TemporaryDirectory()."""
1494
1495    def do_create(self, dir=None, pre="", suf="", recurse=1, dirs=1, files=1,
1496                  ignore_cleanup_errors=False):
1497        if dir is None:
1498            dir = tempfile.gettempdir()
1499        tmp = tempfile.TemporaryDirectory(
1500            dir=dir, prefix=pre, suffix=suf,
1501            ignore_cleanup_errors=ignore_cleanup_errors)
1502        self.nameCheck(tmp.name, dir, pre, suf)
1503        self.do_create2(tmp.name, recurse, dirs, files)
1504        return tmp
1505
1506    def do_create2(self, path, recurse=1, dirs=1, files=1):
1507        # Create subdirectories and some files
1508        if recurse:
1509            for i in range(dirs):
1510                name = os.path.join(path, "dir%d" % i)
1511                os.mkdir(name)
1512                self.do_create2(name, recurse-1, dirs, files)
1513        for i in range(files):
1514            with open(os.path.join(path, "test%d.txt" % i), "wb") as f:
1515                f.write(b"Hello world!")
1516
1517    def test_mkdtemp_failure(self):
1518        # Check no additional exception if mkdtemp fails
1519        # Previously would raise AttributeError instead
1520        # (noted as part of Issue #10188)
1521        with tempfile.TemporaryDirectory() as nonexistent:
1522            pass
1523        with self.assertRaises(FileNotFoundError) as cm:
1524            tempfile.TemporaryDirectory(dir=nonexistent)
1525        self.assertEqual(cm.exception.errno, errno.ENOENT)
1526
1527    def test_explicit_cleanup(self):
1528        # A TemporaryDirectory is deleted when cleaned up
1529        dir = tempfile.mkdtemp()
1530        try:
1531            d = self.do_create(dir=dir)
1532            self.assertTrue(os.path.exists(d.name),
1533                            "TemporaryDirectory %s does not exist" % d.name)
1534            d.cleanup()
1535            self.assertFalse(os.path.exists(d.name),
1536                        "TemporaryDirectory %s exists after cleanup" % d.name)
1537        finally:
1538            os.rmdir(dir)
1539
1540    def test_explict_cleanup_ignore_errors(self):
1541        """Test that cleanup doesn't return an error when ignoring them."""
1542        with tempfile.TemporaryDirectory() as working_dir:
1543            temp_dir = self.do_create(
1544                dir=working_dir, ignore_cleanup_errors=True)
1545            temp_path = pathlib.Path(temp_dir.name)
1546            self.assertTrue(temp_path.exists(),
1547                            f"TemporaryDirectory {temp_path!s} does not exist")
1548            with open(temp_path / "a_file.txt", "w+t") as open_file:
1549                open_file.write("Hello world!\n")
1550                temp_dir.cleanup()
1551            self.assertEqual(len(list(temp_path.glob("*"))),
1552                             int(sys.platform.startswith("win")),
1553                             "Unexpected number of files in "
1554                             f"TemporaryDirectory {temp_path!s}")
1555            self.assertEqual(
1556                temp_path.exists(),
1557                sys.platform.startswith("win"),
1558                f"TemporaryDirectory {temp_path!s} existence state unexpected")
1559            temp_dir.cleanup()
1560            self.assertFalse(
1561                temp_path.exists(),
1562                f"TemporaryDirectory {temp_path!s} exists after cleanup")
1563
1564    @os_helper.skip_unless_symlink
1565    def test_cleanup_with_symlink_to_a_directory(self):
1566        # cleanup() should not follow symlinks to directories (issue #12464)
1567        d1 = self.do_create()
1568        d2 = self.do_create(recurse=0)
1569
1570        # Symlink d1/foo -> d2
1571        os.symlink(d2.name, os.path.join(d1.name, "foo"))
1572
1573        # This call to cleanup() should not follow the "foo" symlink
1574        d1.cleanup()
1575
1576        self.assertFalse(os.path.exists(d1.name),
1577                         "TemporaryDirectory %s exists after cleanup" % d1.name)
1578        self.assertTrue(os.path.exists(d2.name),
1579                        "Directory pointed to by a symlink was deleted")
1580        self.assertEqual(os.listdir(d2.name), ['test0.txt'],
1581                         "Contents of the directory pointed to by a symlink "
1582                         "were deleted")
1583        d2.cleanup()
1584
1585    @os_helper.skip_unless_symlink
1586    def test_cleanup_with_symlink_modes(self):
1587        # cleanup() should not follow symlinks when fixing mode bits (#91133)
1588        with self.do_create(recurse=0) as d2:
1589            file1 = os.path.join(d2, 'file1')
1590            open(file1, 'wb').close()
1591            dir1 = os.path.join(d2, 'dir1')
1592            os.mkdir(dir1)
1593            for mode in range(8):
1594                mode <<= 6
1595                with self.subTest(mode=format(mode, '03o')):
1596                    def test(target, target_is_directory):
1597                        d1 = self.do_create(recurse=0)
1598                        symlink = os.path.join(d1.name, 'symlink')
1599                        os.symlink(target, symlink,
1600                                target_is_directory=target_is_directory)
1601                        try:
1602                            os.chmod(symlink, mode, follow_symlinks=False)
1603                        except NotImplementedError:
1604                            pass
1605                        try:
1606                            os.chmod(symlink, mode)
1607                        except FileNotFoundError:
1608                            pass
1609                        os.chmod(d1.name, mode)
1610                        d1.cleanup()
1611                        self.assertFalse(os.path.exists(d1.name))
1612
1613                    with self.subTest('nonexisting file'):
1614                        test('nonexisting', target_is_directory=False)
1615                    with self.subTest('nonexisting dir'):
1616                        test('nonexisting', target_is_directory=True)
1617
1618                    with self.subTest('existing file'):
1619                        os.chmod(file1, mode)
1620                        old_mode = os.stat(file1).st_mode
1621                        test(file1, target_is_directory=False)
1622                        new_mode = os.stat(file1).st_mode
1623                        self.assertEqual(new_mode, old_mode,
1624                                         '%03o != %03o' % (new_mode, old_mode))
1625
1626                    with self.subTest('existing dir'):
1627                        os.chmod(dir1, mode)
1628                        old_mode = os.stat(dir1).st_mode
1629                        test(dir1, target_is_directory=True)
1630                        new_mode = os.stat(dir1).st_mode
1631                        self.assertEqual(new_mode, old_mode,
1632                                         '%03o != %03o' % (new_mode, old_mode))
1633
1634    @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags')
1635    @os_helper.skip_unless_symlink
1636    def test_cleanup_with_symlink_flags(self):
1637        # cleanup() should not follow symlinks when fixing flags (#91133)
1638        flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK
1639        self.check_flags(flags)
1640
1641        with self.do_create(recurse=0) as d2:
1642            file1 = os.path.join(d2, 'file1')
1643            open(file1, 'wb').close()
1644            dir1 = os.path.join(d2, 'dir1')
1645            os.mkdir(dir1)
1646            def test(target, target_is_directory):
1647                d1 = self.do_create(recurse=0)
1648                symlink = os.path.join(d1.name, 'symlink')
1649                os.symlink(target, symlink,
1650                           target_is_directory=target_is_directory)
1651                try:
1652                    os.chflags(symlink, flags, follow_symlinks=False)
1653                except NotImplementedError:
1654                    pass
1655                try:
1656                    os.chflags(symlink, flags)
1657                except FileNotFoundError:
1658                    pass
1659                os.chflags(d1.name, flags)
1660                d1.cleanup()
1661                self.assertFalse(os.path.exists(d1.name))
1662
1663            with self.subTest('nonexisting file'):
1664                test('nonexisting', target_is_directory=False)
1665            with self.subTest('nonexisting dir'):
1666                test('nonexisting', target_is_directory=True)
1667
1668            with self.subTest('existing file'):
1669                os.chflags(file1, flags)
1670                old_flags = os.stat(file1).st_flags
1671                test(file1, target_is_directory=False)
1672                new_flags = os.stat(file1).st_flags
1673                self.assertEqual(new_flags, old_flags)
1674
1675            with self.subTest('existing dir'):
1676                os.chflags(dir1, flags)
1677                old_flags = os.stat(dir1).st_flags
1678                test(dir1, target_is_directory=True)
1679                new_flags = os.stat(dir1).st_flags
1680                self.assertEqual(new_flags, old_flags)
1681
1682    @support.cpython_only
1683    def test_del_on_collection(self):
1684        # A TemporaryDirectory is deleted when garbage collected
1685        dir = tempfile.mkdtemp()
1686        try:
1687            d = self.do_create(dir=dir)
1688            name = d.name
1689            del d # Rely on refcounting to invoke __del__
1690            self.assertFalse(os.path.exists(name),
1691                        "TemporaryDirectory %s exists after __del__" % name)
1692        finally:
1693            os.rmdir(dir)
1694
1695    @support.cpython_only
1696    def test_del_on_collection_ignore_errors(self):
1697        """Test that ignoring errors works when TemporaryDirectory is gced."""
1698        with tempfile.TemporaryDirectory() as working_dir:
1699            temp_dir = self.do_create(
1700                dir=working_dir, ignore_cleanup_errors=True)
1701            temp_path = pathlib.Path(temp_dir.name)
1702            self.assertTrue(temp_path.exists(),
1703                            f"TemporaryDirectory {temp_path!s} does not exist")
1704            with open(temp_path / "a_file.txt", "w+t") as open_file:
1705                open_file.write("Hello world!\n")
1706                del temp_dir
1707            self.assertEqual(len(list(temp_path.glob("*"))),
1708                             int(sys.platform.startswith("win")),
1709                             "Unexpected number of files in "
1710                             f"TemporaryDirectory {temp_path!s}")
1711            self.assertEqual(
1712                temp_path.exists(),
1713                sys.platform.startswith("win"),
1714                f"TemporaryDirectory {temp_path!s} existence state unexpected")
1715
1716    def test_del_on_shutdown(self):
1717        # A TemporaryDirectory may be cleaned up during shutdown
1718        with self.do_create() as dir:
1719            for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'):
1720                code = """if True:
1721                    import builtins
1722                    import os
1723                    import shutil
1724                    import sys
1725                    import tempfile
1726                    import warnings
1727
1728                    tmp = tempfile.TemporaryDirectory(dir={dir!r})
1729                    sys.stdout.buffer.write(tmp.name.encode())
1730
1731                    tmp2 = os.path.join(tmp.name, 'test_dir')
1732                    os.mkdir(tmp2)
1733                    with open(os.path.join(tmp2, "test0.txt"), "w") as f:
1734                        f.write("Hello world!")
1735
1736                    {mod}.tmp = tmp
1737
1738                    warnings.filterwarnings("always", category=ResourceWarning)
1739                    """.format(dir=dir, mod=mod)
1740                rc, out, err = script_helper.assert_python_ok("-c", code)
1741                tmp_name = out.decode().strip()
1742                self.assertFalse(os.path.exists(tmp_name),
1743                            "TemporaryDirectory %s exists after cleanup" % tmp_name)
1744                err = err.decode('utf-8', 'backslashreplace')
1745                self.assertNotIn("Exception ", err)
1746                self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1747
1748    def test_del_on_shutdown_ignore_errors(self):
1749        """Test ignoring errors works when a tempdir is gc'ed on shutdown."""
1750        with tempfile.TemporaryDirectory() as working_dir:
1751            code = """if True:
1752                import pathlib
1753                import sys
1754                import tempfile
1755                import warnings
1756
1757                temp_dir = tempfile.TemporaryDirectory(
1758                    dir={working_dir!r}, ignore_cleanup_errors=True)
1759                sys.stdout.buffer.write(temp_dir.name.encode())
1760
1761                temp_dir_2 = pathlib.Path(temp_dir.name) / "test_dir"
1762                temp_dir_2.mkdir()
1763                with open(temp_dir_2 / "test0.txt", "w") as test_file:
1764                    test_file.write("Hello world!")
1765                open_file = open(temp_dir_2 / "open_file.txt", "w")
1766                open_file.write("Hello world!")
1767
1768                warnings.filterwarnings("always", category=ResourceWarning)
1769                """.format(working_dir=working_dir)
1770            __, out, err = script_helper.assert_python_ok("-c", code)
1771            temp_path = pathlib.Path(out.decode().strip())
1772            self.assertEqual(len(list(temp_path.glob("*"))),
1773                             int(sys.platform.startswith("win")),
1774                             "Unexpected number of files in "
1775                             f"TemporaryDirectory {temp_path!s}")
1776            self.assertEqual(
1777                temp_path.exists(),
1778                sys.platform.startswith("win"),
1779                f"TemporaryDirectory {temp_path!s} existence state unexpected")
1780            err = err.decode('utf-8', 'backslashreplace')
1781            self.assertNotIn("Exception", err)
1782            self.assertNotIn("Error", err)
1783            self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1784
1785    def test_exit_on_shutdown(self):
1786        # Issue #22427
1787        with self.do_create() as dir:
1788            code = """if True:
1789                import sys
1790                import tempfile
1791                import warnings
1792
1793                def generator():
1794                    with tempfile.TemporaryDirectory(dir={dir!r}) as tmp:
1795                        yield tmp
1796                g = generator()
1797                sys.stdout.buffer.write(next(g).encode())
1798
1799                warnings.filterwarnings("always", category=ResourceWarning)
1800                """.format(dir=dir)
1801            rc, out, err = script_helper.assert_python_ok("-c", code)
1802            tmp_name = out.decode().strip()
1803            self.assertFalse(os.path.exists(tmp_name),
1804                        "TemporaryDirectory %s exists after cleanup" % tmp_name)
1805            err = err.decode('utf-8', 'backslashreplace')
1806            self.assertNotIn("Exception ", err)
1807            self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1808
1809    def test_warnings_on_cleanup(self):
1810        # ResourceWarning will be triggered by __del__
1811        with self.do_create() as dir:
1812            d = self.do_create(dir=dir, recurse=3)
1813            name = d.name
1814
1815            # Check for the resource warning
1816            with warnings_helper.check_warnings(('Implicitly',
1817                                                 ResourceWarning),
1818                                                quiet=False):
1819                warnings.filterwarnings("always", category=ResourceWarning)
1820                del d
1821                support.gc_collect()
1822            self.assertFalse(os.path.exists(name),
1823                        "TemporaryDirectory %s exists after __del__" % name)
1824
1825    def test_multiple_close(self):
1826        # Can be cleaned-up many times without error
1827        d = self.do_create()
1828        d.cleanup()
1829        d.cleanup()
1830        d.cleanup()
1831
1832    def test_context_manager(self):
1833        # Can be used as a context manager
1834        d = self.do_create()
1835        with d as name:
1836            self.assertTrue(os.path.exists(name))
1837            self.assertEqual(name, d.name)
1838        self.assertFalse(os.path.exists(name))
1839
1840    def test_modes(self):
1841        for mode in range(8):
1842            mode <<= 6
1843            with self.subTest(mode=format(mode, '03o')):
1844                d = self.do_create(recurse=3, dirs=2, files=2)
1845                with d:
1846                    # Change files and directories mode recursively.
1847                    for root, dirs, files in os.walk(d.name, topdown=False):
1848                        for name in files:
1849                            os.chmod(os.path.join(root, name), mode)
1850                        os.chmod(root, mode)
1851                    d.cleanup()
1852                self.assertFalse(os.path.exists(d.name))
1853
1854    def check_flags(self, flags):
1855        # skip the test if these flags are not supported (ex: FreeBSD 13)
1856        filename = os_helper.TESTFN
1857        try:
1858            open(filename, "w").close()
1859            try:
1860                os.chflags(filename, flags)
1861            except OSError as exc:
1862                # "OSError: [Errno 45] Operation not supported"
1863                self.skipTest(f"chflags() doesn't support flags "
1864                              f"{flags:#b}: {exc}")
1865            else:
1866                os.chflags(filename, 0)
1867        finally:
1868            os_helper.unlink(filename)
1869
1870    @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags')
1871    def test_flags(self):
1872        flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK
1873        self.check_flags(flags)
1874
1875        d = self.do_create(recurse=3, dirs=2, files=2)
1876        with d:
1877            # Change files and directories flags recursively.
1878            for root, dirs, files in os.walk(d.name, topdown=False):
1879                for name in files:
1880                    os.chflags(os.path.join(root, name), flags)
1881                os.chflags(root, flags)
1882            d.cleanup()
1883        self.assertFalse(os.path.exists(d.name))
1884
1885
1886if __name__ == "__main__":
1887    unittest.main()
1888