1# tempfile.py unit tests. 2import tempfile 3import errno 4import io 5import os 6import pathlib 7import sys 8import re 9import warnings 10import contextlib 11import stat 12import types 13import weakref 14import gc 15import shutil 16import subprocess 17from unittest import mock 18 19import unittest 20from test import support 21from test.support import os_helper 22from test.support import script_helper 23from test.support import warnings_helper 24 25 26has_textmode = (tempfile._text_openflags != tempfile._bin_openflags) 27has_spawnl = hasattr(os, 'spawnl') 28 29# TEST_FILES may need to be tweaked for systems depending on the maximum 30# number of files that can be opened at one time (see ulimit -n) 31if sys.platform.startswith('openbsd'): 32 TEST_FILES = 48 33else: 34 TEST_FILES = 100 35 36# This is organized as one test for each chunk of code in tempfile.py, 37# in order of their appearance in the file. Testing which requires 38# threads is not done here. 39 40class TestLowLevelInternals(unittest.TestCase): 41 def test_infer_return_type_singles(self): 42 self.assertIs(str, tempfile._infer_return_type('')) 43 self.assertIs(bytes, tempfile._infer_return_type(b'')) 44 self.assertIs(str, tempfile._infer_return_type(None)) 45 46 def test_infer_return_type_multiples(self): 47 self.assertIs(str, tempfile._infer_return_type('', '')) 48 self.assertIs(bytes, tempfile._infer_return_type(b'', b'')) 49 with self.assertRaises(TypeError): 50 tempfile._infer_return_type('', b'') 51 with self.assertRaises(TypeError): 52 tempfile._infer_return_type(b'', '') 53 54 def test_infer_return_type_multiples_and_none(self): 55 self.assertIs(str, tempfile._infer_return_type(None, '')) 56 self.assertIs(str, tempfile._infer_return_type('', None)) 57 self.assertIs(str, tempfile._infer_return_type(None, None)) 58 self.assertIs(bytes, tempfile._infer_return_type(b'', None)) 59 self.assertIs(bytes, tempfile._infer_return_type(None, b'')) 60 with self.assertRaises(TypeError): 61 tempfile._infer_return_type('', None, b'') 62 with self.assertRaises(TypeError): 63 tempfile._infer_return_type(b'', None, '') 64 65 def test_infer_return_type_pathlib(self): 66 self.assertIs(str, tempfile._infer_return_type(os_helper.FakePath('/'))) 67 68 def test_infer_return_type_pathlike(self): 69 Path = os_helper.FakePath 70 self.assertIs(str, tempfile._infer_return_type(Path('/'))) 71 self.assertIs(bytes, tempfile._infer_return_type(Path(b'/'))) 72 self.assertIs(str, tempfile._infer_return_type('', Path(''))) 73 self.assertIs(bytes, tempfile._infer_return_type(b'', Path(b''))) 74 self.assertIs(bytes, tempfile._infer_return_type(None, Path(b''))) 75 self.assertIs(str, tempfile._infer_return_type(None, Path(''))) 76 77 with self.assertRaises(TypeError): 78 tempfile._infer_return_type('', Path(b'')) 79 with self.assertRaises(TypeError): 80 tempfile._infer_return_type(b'', Path('')) 81 82# Common functionality. 83 84class BaseTestCase(unittest.TestCase): 85 86 str_check = re.compile(r"^[a-z0-9_-]{8}$") 87 b_check = re.compile(br"^[a-z0-9_-]{8}$") 88 89 def setUp(self): 90 self.enterContext(warnings_helper.check_warnings()) 91 warnings.filterwarnings("ignore", category=RuntimeWarning, 92 message="mktemp", module=__name__) 93 94 def nameCheck(self, name, dir, pre, suf): 95 (ndir, nbase) = os.path.split(name) 96 npre = nbase[:len(pre)] 97 nsuf = nbase[len(nbase)-len(suf):] 98 99 if dir is not None: 100 self.assertIs( 101 type(name), 102 str 103 if type(dir) is str or isinstance(dir, os.PathLike) else 104 bytes, 105 "unexpected return type", 106 ) 107 if pre is not None: 108 self.assertIs(type(name), str if type(pre) is str else bytes, 109 "unexpected return type") 110 if suf is not None: 111 self.assertIs(type(name), str if type(suf) is str else bytes, 112 "unexpected return type") 113 if (dir, pre, suf) == (None, None, None): 114 self.assertIs(type(name), str, "default return type must be str") 115 116 # check for equality of the absolute paths! 117 self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir), 118 "file %r not in directory %r" % (name, dir)) 119 self.assertEqual(npre, pre, 120 "file %r does not begin with %r" % (nbase, pre)) 121 self.assertEqual(nsuf, suf, 122 "file %r does not end with %r" % (nbase, suf)) 123 124 nbase = nbase[len(pre):len(nbase)-len(suf)] 125 check = self.str_check if isinstance(nbase, str) else self.b_check 126 self.assertTrue(check.match(nbase), 127 "random characters %r do not match %r" 128 % (nbase, check.pattern)) 129 130 131class TestExports(BaseTestCase): 132 def test_exports(self): 133 # There are no surprising symbols in the tempfile module 134 dict = tempfile.__dict__ 135 136 expected = { 137 "NamedTemporaryFile" : 1, 138 "TemporaryFile" : 1, 139 "mkstemp" : 1, 140 "mkdtemp" : 1, 141 "mktemp" : 1, 142 "TMP_MAX" : 1, 143 "gettempprefix" : 1, 144 "gettempprefixb" : 1, 145 "gettempdir" : 1, 146 "gettempdirb" : 1, 147 "tempdir" : 1, 148 "template" : 1, 149 "SpooledTemporaryFile" : 1, 150 "TemporaryDirectory" : 1, 151 } 152 153 unexp = [] 154 for key in dict: 155 if key[0] != '_' and key not in expected: 156 unexp.append(key) 157 self.assertTrue(len(unexp) == 0, 158 "unexpected keys: %s" % unexp) 159 160 161class TestRandomNameSequence(BaseTestCase): 162 """Test the internal iterator object _RandomNameSequence.""" 163 164 def setUp(self): 165 self.r = tempfile._RandomNameSequence() 166 super().setUp() 167 168 def test_get_eight_char_str(self): 169 # _RandomNameSequence returns a eight-character string 170 s = next(self.r) 171 self.nameCheck(s, '', '', '') 172 173 def test_many(self): 174 # _RandomNameSequence returns no duplicate strings (stochastic) 175 176 dict = {} 177 r = self.r 178 for i in range(TEST_FILES): 179 s = next(r) 180 self.nameCheck(s, '', '', '') 181 self.assertNotIn(s, dict) 182 dict[s] = 1 183 184 def supports_iter(self): 185 # _RandomNameSequence supports the iterator protocol 186 187 i = 0 188 r = self.r 189 for s in r: 190 i += 1 191 if i == 20: 192 break 193 194 @support.requires_fork() 195 def test_process_awareness(self): 196 # ensure that the random source differs between 197 # child and parent. 198 read_fd, write_fd = os.pipe() 199 pid = None 200 try: 201 pid = os.fork() 202 if not pid: 203 # child process 204 os.close(read_fd) 205 os.write(write_fd, next(self.r).encode("ascii")) 206 os.close(write_fd) 207 # bypass the normal exit handlers- leave those to 208 # the parent. 209 os._exit(0) 210 211 # parent process 212 parent_value = next(self.r) 213 child_value = os.read(read_fd, len(parent_value)).decode("ascii") 214 finally: 215 if pid: 216 support.wait_process(pid, exitcode=0) 217 218 os.close(read_fd) 219 os.close(write_fd) 220 self.assertNotEqual(child_value, parent_value) 221 222 223 224class TestCandidateTempdirList(BaseTestCase): 225 """Test the internal function _candidate_tempdir_list.""" 226 227 def test_nonempty_list(self): 228 # _candidate_tempdir_list returns a nonempty list of strings 229 230 cand = tempfile._candidate_tempdir_list() 231 232 self.assertFalse(len(cand) == 0) 233 for c in cand: 234 self.assertIsInstance(c, str) 235 236 def test_wanted_dirs(self): 237 # _candidate_tempdir_list contains the expected directories 238 239 # Make sure the interesting environment variables are all set. 240 with os_helper.EnvironmentVarGuard() as env: 241 for envname in 'TMPDIR', 'TEMP', 'TMP': 242 dirname = os.getenv(envname) 243 if not dirname: 244 env[envname] = os.path.abspath(envname) 245 246 cand = tempfile._candidate_tempdir_list() 247 248 for envname in 'TMPDIR', 'TEMP', 'TMP': 249 dirname = os.getenv(envname) 250 if not dirname: raise ValueError 251 self.assertIn(dirname, cand) 252 253 try: 254 dirname = os.getcwd() 255 except (AttributeError, OSError): 256 dirname = os.curdir 257 258 self.assertIn(dirname, cand) 259 260 # Not practical to try to verify the presence of OS-specific 261 # paths in this list. 262 263 264# We test _get_default_tempdir some more by testing gettempdir. 265 266class TestGetDefaultTempdir(BaseTestCase): 267 """Test _get_default_tempdir().""" 268 269 def test_no_files_left_behind(self): 270 # use a private empty directory 271 with tempfile.TemporaryDirectory() as our_temp_directory: 272 # force _get_default_tempdir() to consider our empty directory 273 def our_candidate_list(): 274 return [our_temp_directory] 275 276 with support.swap_attr(tempfile, "_candidate_tempdir_list", 277 our_candidate_list): 278 # verify our directory is empty after _get_default_tempdir() 279 tempfile._get_default_tempdir() 280 self.assertEqual(os.listdir(our_temp_directory), []) 281 282 def raise_OSError(*args, **kwargs): 283 raise OSError() 284 285 with support.swap_attr(os, "open", raise_OSError): 286 # test again with failing os.open() 287 with self.assertRaises(FileNotFoundError): 288 tempfile._get_default_tempdir() 289 self.assertEqual(os.listdir(our_temp_directory), []) 290 291 with support.swap_attr(os, "write", raise_OSError): 292 # test again with failing os.write() 293 with self.assertRaises(FileNotFoundError): 294 tempfile._get_default_tempdir() 295 self.assertEqual(os.listdir(our_temp_directory), []) 296 297 298class TestGetCandidateNames(BaseTestCase): 299 """Test the internal function _get_candidate_names.""" 300 301 def test_retval(self): 302 # _get_candidate_names returns a _RandomNameSequence object 303 obj = tempfile._get_candidate_names() 304 self.assertIsInstance(obj, tempfile._RandomNameSequence) 305 306 def test_same_thing(self): 307 # _get_candidate_names always returns the same object 308 a = tempfile._get_candidate_names() 309 b = tempfile._get_candidate_names() 310 311 self.assertTrue(a is b) 312 313 314@contextlib.contextmanager 315def _inside_empty_temp_dir(): 316 dir = tempfile.mkdtemp() 317 try: 318 with support.swap_attr(tempfile, 'tempdir', dir): 319 yield 320 finally: 321 os_helper.rmtree(dir) 322 323 324def _mock_candidate_names(*names): 325 return support.swap_attr(tempfile, 326 '_get_candidate_names', 327 lambda: iter(names)) 328 329 330class TestBadTempdir: 331 332 @unittest.skipIf( 333 support.is_emscripten, "Emscripten cannot remove write bits." 334 ) 335 def test_read_only_directory(self): 336 with _inside_empty_temp_dir(): 337 oldmode = mode = os.stat(tempfile.tempdir).st_mode 338 mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH) 339 os.chmod(tempfile.tempdir, mode) 340 try: 341 if os.access(tempfile.tempdir, os.W_OK): 342 self.skipTest("can't set the directory read-only") 343 with self.assertRaises(PermissionError): 344 self.make_temp() 345 self.assertEqual(os.listdir(tempfile.tempdir), []) 346 finally: 347 os.chmod(tempfile.tempdir, oldmode) 348 349 def test_nonexisting_directory(self): 350 with _inside_empty_temp_dir(): 351 tempdir = os.path.join(tempfile.tempdir, 'nonexistent') 352 with support.swap_attr(tempfile, 'tempdir', tempdir): 353 with self.assertRaises(FileNotFoundError): 354 self.make_temp() 355 356 def test_non_directory(self): 357 with _inside_empty_temp_dir(): 358 tempdir = os.path.join(tempfile.tempdir, 'file') 359 open(tempdir, 'wb').close() 360 with support.swap_attr(tempfile, 'tempdir', tempdir): 361 with self.assertRaises((NotADirectoryError, FileNotFoundError)): 362 self.make_temp() 363 364 365class TestMkstempInner(TestBadTempdir, BaseTestCase): 366 """Test the internal function _mkstemp_inner.""" 367 368 class mkstemped: 369 _bflags = tempfile._bin_openflags 370 _tflags = tempfile._text_openflags 371 _close = os.close 372 _unlink = os.unlink 373 374 def __init__(self, dir, pre, suf, bin): 375 if bin: flags = self._bflags 376 else: flags = self._tflags 377 378 output_type = tempfile._infer_return_type(dir, pre, suf) 379 (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type) 380 381 def write(self, str): 382 os.write(self.fd, str) 383 384 def __del__(self): 385 self._close(self.fd) 386 self._unlink(self.name) 387 388 def do_create(self, dir=None, pre=None, suf=None, bin=1): 389 output_type = tempfile._infer_return_type(dir, pre, suf) 390 if dir is None: 391 if output_type is str: 392 dir = tempfile.gettempdir() 393 else: 394 dir = tempfile.gettempdirb() 395 if pre is None: 396 pre = output_type() 397 if suf is None: 398 suf = output_type() 399 file = self.mkstemped(dir, pre, suf, bin) 400 401 self.nameCheck(file.name, dir, pre, suf) 402 return file 403 404 def test_basic(self): 405 # _mkstemp_inner can create files 406 self.do_create().write(b"blat") 407 self.do_create(pre="a").write(b"blat") 408 self.do_create(suf="b").write(b"blat") 409 self.do_create(pre="a", suf="b").write(b"blat") 410 self.do_create(pre="aa", suf=".txt").write(b"blat") 411 412 def test_basic_with_bytes_names(self): 413 # _mkstemp_inner can create files when given name parts all 414 # specified as bytes. 415 dir_b = tempfile.gettempdirb() 416 self.do_create(dir=dir_b, suf=b"").write(b"blat") 417 self.do_create(dir=dir_b, pre=b"a").write(b"blat") 418 self.do_create(dir=dir_b, suf=b"b").write(b"blat") 419 self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat") 420 self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat") 421 # Can't mix str & binary types in the args. 422 with self.assertRaises(TypeError): 423 self.do_create(dir="", suf=b"").write(b"blat") 424 with self.assertRaises(TypeError): 425 self.do_create(dir=dir_b, pre="").write(b"blat") 426 with self.assertRaises(TypeError): 427 self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat") 428 429 def test_basic_many(self): 430 # _mkstemp_inner can create many files (stochastic) 431 extant = list(range(TEST_FILES)) 432 for i in extant: 433 extant[i] = self.do_create(pre="aa") 434 435 def test_choose_directory(self): 436 # _mkstemp_inner can create files in a user-selected directory 437 dir = tempfile.mkdtemp() 438 try: 439 self.do_create(dir=dir).write(b"blat") 440 self.do_create(dir=os_helper.FakePath(dir)).write(b"blat") 441 finally: 442 support.gc_collect() # For PyPy or other GCs. 443 os.rmdir(dir) 444 445 @os_helper.skip_unless_working_chmod 446 def test_file_mode(self): 447 # _mkstemp_inner creates files with the proper mode 448 449 file = self.do_create() 450 mode = stat.S_IMODE(os.stat(file.name).st_mode) 451 expected = 0o600 452 if sys.platform == 'win32': 453 # There's no distinction among 'user', 'group' and 'world'; 454 # replicate the 'user' bits. 455 user = expected >> 6 456 expected = user * (1 + 8 + 64) 457 self.assertEqual(mode, expected) 458 459 @unittest.skipUnless(has_spawnl, 'os.spawnl not available') 460 @support.requires_subprocess() 461 def test_noinherit(self): 462 # _mkstemp_inner file handles are not inherited by child processes 463 464 if support.verbose: 465 v="v" 466 else: 467 v="q" 468 469 file = self.do_create() 470 self.assertEqual(os.get_inheritable(file.fd), False) 471 fd = "%d" % file.fd 472 473 try: 474 me = __file__ 475 except NameError: 476 me = sys.argv[0] 477 478 # We have to exec something, so that FD_CLOEXEC will take 479 # effect. The core of this test is therefore in 480 # tf_inherit_check.py, which see. 481 tester = os.path.join(os.path.dirname(os.path.abspath(me)), 482 "tf_inherit_check.py") 483 484 # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted, 485 # but an arg with embedded spaces should be decorated with double 486 # quotes on each end 487 if sys.platform == 'win32': 488 decorated = '"%s"' % sys.executable 489 tester = '"%s"' % tester 490 else: 491 decorated = sys.executable 492 493 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd) 494 self.assertFalse(retval < 0, 495 "child process caught fatal signal %d" % -retval) 496 self.assertFalse(retval > 0, "child process reports failure %d"%retval) 497 498 @unittest.skipUnless(has_textmode, "text mode not available") 499 def test_textmode(self): 500 # _mkstemp_inner can create files in text mode 501 502 # A text file is truncated at the first Ctrl+Z byte 503 f = self.do_create(bin=0) 504 f.write(b"blat\x1a") 505 f.write(b"extra\n") 506 os.lseek(f.fd, 0, os.SEEK_SET) 507 self.assertEqual(os.read(f.fd, 20), b"blat") 508 509 def make_temp(self): 510 return tempfile._mkstemp_inner(tempfile.gettempdir(), 511 tempfile.gettempprefix(), 512 '', 513 tempfile._bin_openflags, 514 str) 515 516 def test_collision_with_existing_file(self): 517 # _mkstemp_inner tries another name when a file with 518 # the chosen name already exists 519 with _inside_empty_temp_dir(), \ 520 _mock_candidate_names('aaa', 'aaa', 'bbb'): 521 (fd1, name1) = self.make_temp() 522 os.close(fd1) 523 self.assertTrue(name1.endswith('aaa')) 524 525 (fd2, name2) = self.make_temp() 526 os.close(fd2) 527 self.assertTrue(name2.endswith('bbb')) 528 529 def test_collision_with_existing_directory(self): 530 # _mkstemp_inner tries another name when a directory with 531 # the chosen name already exists 532 with _inside_empty_temp_dir(), \ 533 _mock_candidate_names('aaa', 'aaa', 'bbb'): 534 dir = tempfile.mkdtemp() 535 self.assertTrue(dir.endswith('aaa')) 536 537 (fd, name) = self.make_temp() 538 os.close(fd) 539 self.assertTrue(name.endswith('bbb')) 540 541 542class TestGetTempPrefix(BaseTestCase): 543 """Test gettempprefix().""" 544 545 def test_sane_template(self): 546 # gettempprefix returns a nonempty prefix string 547 p = tempfile.gettempprefix() 548 549 self.assertIsInstance(p, str) 550 self.assertGreater(len(p), 0) 551 552 pb = tempfile.gettempprefixb() 553 554 self.assertIsInstance(pb, bytes) 555 self.assertGreater(len(pb), 0) 556 557 def test_usable_template(self): 558 # gettempprefix returns a usable prefix string 559 560 # Create a temp directory, avoiding use of the prefix. 561 # Then attempt to create a file whose name is 562 # prefix + 'xxxxxx.xxx' in that directory. 563 p = tempfile.gettempprefix() + "xxxxxx.xxx" 564 d = tempfile.mkdtemp(prefix="") 565 try: 566 p = os.path.join(d, p) 567 fd = os.open(p, os.O_RDWR | os.O_CREAT) 568 os.close(fd) 569 os.unlink(p) 570 finally: 571 os.rmdir(d) 572 573 574class TestGetTempDir(BaseTestCase): 575 """Test gettempdir().""" 576 577 def test_directory_exists(self): 578 # gettempdir returns a directory which exists 579 580 for d in (tempfile.gettempdir(), tempfile.gettempdirb()): 581 self.assertTrue(os.path.isabs(d) or d == os.curdir, 582 "%r is not an absolute path" % d) 583 self.assertTrue(os.path.isdir(d), 584 "%r is not a directory" % d) 585 586 def test_directory_writable(self): 587 # gettempdir returns a directory writable by the user 588 589 # sneaky: just instantiate a NamedTemporaryFile, which 590 # defaults to writing into the directory returned by 591 # gettempdir. 592 with tempfile.NamedTemporaryFile() as file: 593 file.write(b"blat") 594 595 def test_same_thing(self): 596 # gettempdir always returns the same object 597 a = tempfile.gettempdir() 598 b = tempfile.gettempdir() 599 c = tempfile.gettempdirb() 600 601 self.assertTrue(a is b) 602 self.assertNotEqual(type(a), type(c)) 603 self.assertEqual(a, os.fsdecode(c)) 604 605 def test_case_sensitive(self): 606 # gettempdir should not flatten its case 607 # even on a case-insensitive file system 608 case_sensitive_tempdir = tempfile.mkdtemp("-Temp") 609 _tempdir, tempfile.tempdir = tempfile.tempdir, None 610 try: 611 with os_helper.EnvironmentVarGuard() as env: 612 # Fake the first env var which is checked as a candidate 613 env["TMPDIR"] = case_sensitive_tempdir 614 self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir) 615 finally: 616 tempfile.tempdir = _tempdir 617 os_helper.rmdir(case_sensitive_tempdir) 618 619 620class TestMkstemp(BaseTestCase): 621 """Test mkstemp().""" 622 623 def do_create(self, dir=None, pre=None, suf=None): 624 output_type = tempfile._infer_return_type(dir, pre, suf) 625 if dir is None: 626 if output_type is str: 627 dir = tempfile.gettempdir() 628 else: 629 dir = tempfile.gettempdirb() 630 if pre is None: 631 pre = output_type() 632 if suf is None: 633 suf = output_type() 634 (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf) 635 (ndir, nbase) = os.path.split(name) 636 adir = os.path.abspath(dir) 637 self.assertEqual(adir, ndir, 638 "Directory '%s' incorrectly returned as '%s'" % (adir, ndir)) 639 640 try: 641 self.nameCheck(name, dir, pre, suf) 642 finally: 643 os.close(fd) 644 os.unlink(name) 645 646 def test_basic(self): 647 # mkstemp can create files 648 self.do_create() 649 self.do_create(pre="a") 650 self.do_create(suf="b") 651 self.do_create(pre="a", suf="b") 652 self.do_create(pre="aa", suf=".txt") 653 self.do_create(dir=".") 654 655 def test_basic_with_bytes_names(self): 656 # mkstemp can create files when given name parts all 657 # specified as bytes. 658 d = tempfile.gettempdirb() 659 self.do_create(dir=d, suf=b"") 660 self.do_create(dir=d, pre=b"a") 661 self.do_create(dir=d, suf=b"b") 662 self.do_create(dir=d, pre=b"a", suf=b"b") 663 self.do_create(dir=d, pre=b"aa", suf=b".txt") 664 self.do_create(dir=b".") 665 with self.assertRaises(TypeError): 666 self.do_create(dir=".", pre=b"aa", suf=b".txt") 667 with self.assertRaises(TypeError): 668 self.do_create(dir=b".", pre="aa", suf=b".txt") 669 with self.assertRaises(TypeError): 670 self.do_create(dir=b".", pre=b"aa", suf=".txt") 671 672 673 def test_choose_directory(self): 674 # mkstemp can create directories in a user-selected directory 675 dir = tempfile.mkdtemp() 676 try: 677 self.do_create(dir=dir) 678 self.do_create(dir=os_helper.FakePath(dir)) 679 finally: 680 os.rmdir(dir) 681 682 def test_for_tempdir_is_bytes_issue40701_api_warts(self): 683 orig_tempdir = tempfile.tempdir 684 self.assertIsInstance(tempfile.tempdir, (str, type(None))) 685 try: 686 fd, path = tempfile.mkstemp() 687 os.close(fd) 688 os.unlink(path) 689 self.assertIsInstance(path, str) 690 tempfile.tempdir = tempfile.gettempdirb() 691 self.assertIsInstance(tempfile.tempdir, bytes) 692 self.assertIsInstance(tempfile.gettempdir(), str) 693 self.assertIsInstance(tempfile.gettempdirb(), bytes) 694 fd, path = tempfile.mkstemp() 695 os.close(fd) 696 os.unlink(path) 697 self.assertIsInstance(path, bytes) 698 fd, path = tempfile.mkstemp(suffix='.txt') 699 os.close(fd) 700 os.unlink(path) 701 self.assertIsInstance(path, str) 702 fd, path = tempfile.mkstemp(prefix='test-temp-') 703 os.close(fd) 704 os.unlink(path) 705 self.assertIsInstance(path, str) 706 fd, path = tempfile.mkstemp(dir=tempfile.gettempdir()) 707 os.close(fd) 708 os.unlink(path) 709 self.assertIsInstance(path, str) 710 finally: 711 tempfile.tempdir = orig_tempdir 712 713 714class TestMkdtemp(TestBadTempdir, BaseTestCase): 715 """Test mkdtemp().""" 716 717 def make_temp(self): 718 return tempfile.mkdtemp() 719 720 def do_create(self, dir=None, pre=None, suf=None): 721 output_type = tempfile._infer_return_type(dir, pre, suf) 722 if dir is None: 723 if output_type is str: 724 dir = tempfile.gettempdir() 725 else: 726 dir = tempfile.gettempdirb() 727 if pre is None: 728 pre = output_type() 729 if suf is None: 730 suf = output_type() 731 name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf) 732 733 try: 734 self.nameCheck(name, dir, pre, suf) 735 return name 736 except: 737 os.rmdir(name) 738 raise 739 740 def test_basic(self): 741 # mkdtemp can create directories 742 os.rmdir(self.do_create()) 743 os.rmdir(self.do_create(pre="a")) 744 os.rmdir(self.do_create(suf="b")) 745 os.rmdir(self.do_create(pre="a", suf="b")) 746 os.rmdir(self.do_create(pre="aa", suf=".txt")) 747 748 def test_basic_with_bytes_names(self): 749 # mkdtemp can create directories when given all binary parts 750 d = tempfile.gettempdirb() 751 os.rmdir(self.do_create(dir=d)) 752 os.rmdir(self.do_create(dir=d, pre=b"a")) 753 os.rmdir(self.do_create(dir=d, suf=b"b")) 754 os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b")) 755 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt")) 756 with self.assertRaises(TypeError): 757 os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt")) 758 with self.assertRaises(TypeError): 759 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt")) 760 with self.assertRaises(TypeError): 761 os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt")) 762 763 def test_basic_many(self): 764 # mkdtemp can create many directories (stochastic) 765 extant = list(range(TEST_FILES)) 766 try: 767 for i in extant: 768 extant[i] = self.do_create(pre="aa") 769 finally: 770 for i in extant: 771 if(isinstance(i, str)): 772 os.rmdir(i) 773 774 def test_choose_directory(self): 775 # mkdtemp can create directories in a user-selected directory 776 dir = tempfile.mkdtemp() 777 try: 778 os.rmdir(self.do_create(dir=dir)) 779 os.rmdir(self.do_create(dir=os_helper.FakePath(dir))) 780 finally: 781 os.rmdir(dir) 782 783 @os_helper.skip_unless_working_chmod 784 def test_mode(self): 785 # mkdtemp creates directories with the proper mode 786 787 dir = self.do_create() 788 try: 789 mode = stat.S_IMODE(os.stat(dir).st_mode) 790 mode &= 0o777 # Mask off sticky bits inherited from /tmp 791 expected = 0o700 792 if sys.platform == 'win32': 793 # There's no distinction among 'user', 'group' and 'world'; 794 # replicate the 'user' bits. 795 user = expected >> 6 796 expected = user * (1 + 8 + 64) 797 self.assertEqual(mode, expected) 798 finally: 799 os.rmdir(dir) 800 801 @unittest.skipUnless(os.name == "nt", "Only on Windows.") 802 def test_mode_win32(self): 803 # Use icacls.exe to extract the users with some level of access 804 # Main thing we are testing is that the BUILTIN\Users group has 805 # no access. The exact ACL is going to vary based on which user 806 # is running the test. 807 dir = self.do_create() 808 try: 809 out = subprocess.check_output(["icacls.exe", dir], encoding="oem").casefold() 810 finally: 811 os.rmdir(dir) 812 813 dir = dir.casefold() 814 users = set() 815 found_user = False 816 for line in out.strip().splitlines(): 817 acl = None 818 # First line of result includes our directory 819 if line.startswith(dir): 820 acl = line.removeprefix(dir).strip() 821 elif line and line[:1].isspace(): 822 acl = line.strip() 823 if acl: 824 users.add(acl.partition(":")[0]) 825 826 self.assertNotIn(r"BUILTIN\Users".casefold(), users) 827 828 def test_collision_with_existing_file(self): 829 # mkdtemp tries another name when a file with 830 # the chosen name already exists 831 with _inside_empty_temp_dir(), \ 832 _mock_candidate_names('aaa', 'aaa', 'bbb'): 833 file = tempfile.NamedTemporaryFile(delete=False) 834 file.close() 835 self.assertTrue(file.name.endswith('aaa')) 836 dir = tempfile.mkdtemp() 837 self.assertTrue(dir.endswith('bbb')) 838 839 def test_collision_with_existing_directory(self): 840 # mkdtemp tries another name when a directory with 841 # the chosen name already exists 842 with _inside_empty_temp_dir(), \ 843 _mock_candidate_names('aaa', 'aaa', 'bbb'): 844 dir1 = tempfile.mkdtemp() 845 self.assertTrue(dir1.endswith('aaa')) 846 dir2 = tempfile.mkdtemp() 847 self.assertTrue(dir2.endswith('bbb')) 848 849 def test_for_tempdir_is_bytes_issue40701_api_warts(self): 850 orig_tempdir = tempfile.tempdir 851 self.assertIsInstance(tempfile.tempdir, (str, type(None))) 852 try: 853 path = tempfile.mkdtemp() 854 os.rmdir(path) 855 self.assertIsInstance(path, str) 856 tempfile.tempdir = tempfile.gettempdirb() 857 self.assertIsInstance(tempfile.tempdir, bytes) 858 self.assertIsInstance(tempfile.gettempdir(), str) 859 self.assertIsInstance(tempfile.gettempdirb(), bytes) 860 path = tempfile.mkdtemp() 861 os.rmdir(path) 862 self.assertIsInstance(path, bytes) 863 path = tempfile.mkdtemp(suffix='-dir') 864 os.rmdir(path) 865 self.assertIsInstance(path, str) 866 path = tempfile.mkdtemp(prefix='test-mkdtemp-') 867 os.rmdir(path) 868 self.assertIsInstance(path, str) 869 path = tempfile.mkdtemp(dir=tempfile.gettempdir()) 870 os.rmdir(path) 871 self.assertIsInstance(path, str) 872 finally: 873 tempfile.tempdir = orig_tempdir 874 875 def test_path_is_absolute(self): 876 # Test that the path returned by mkdtemp with a relative `dir` 877 # argument is absolute 878 try: 879 path = tempfile.mkdtemp(dir=".") 880 self.assertTrue(os.path.isabs(path)) 881 finally: 882 os.rmdir(path) 883 884 885class TestMktemp(BaseTestCase): 886 """Test mktemp().""" 887 888 # For safety, all use of mktemp must occur in a private directory. 889 # We must also suppress the RuntimeWarning it generates. 890 def setUp(self): 891 self.dir = tempfile.mkdtemp() 892 super().setUp() 893 894 def tearDown(self): 895 if self.dir: 896 os.rmdir(self.dir) 897 self.dir = None 898 super().tearDown() 899 900 class mktemped: 901 _unlink = os.unlink 902 _bflags = tempfile._bin_openflags 903 904 def __init__(self, dir, pre, suf): 905 self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf) 906 # Create the file. This will raise an exception if it's 907 # mysteriously appeared in the meanwhile. 908 os.close(os.open(self.name, self._bflags, 0o600)) 909 910 def __del__(self): 911 self._unlink(self.name) 912 913 def do_create(self, pre="", suf=""): 914 file = self.mktemped(self.dir, pre, suf) 915 916 self.nameCheck(file.name, self.dir, pre, suf) 917 return file 918 919 def test_basic(self): 920 # mktemp can choose usable file names 921 self.do_create() 922 self.do_create(pre="a") 923 self.do_create(suf="b") 924 self.do_create(pre="a", suf="b") 925 self.do_create(pre="aa", suf=".txt") 926 927 def test_many(self): 928 # mktemp can choose many usable file names (stochastic) 929 extant = list(range(TEST_FILES)) 930 for i in extant: 931 extant[i] = self.do_create(pre="aa") 932 del extant 933 support.gc_collect() # For PyPy or other GCs. 934 935## def test_warning(self): 936## # mktemp issues a warning when used 937## warnings.filterwarnings("error", 938## category=RuntimeWarning, 939## message="mktemp") 940## self.assertRaises(RuntimeWarning, 941## tempfile.mktemp, dir=self.dir) 942 943 944# We test _TemporaryFileWrapper by testing NamedTemporaryFile. 945 946 947class TestNamedTemporaryFile(BaseTestCase): 948 """Test NamedTemporaryFile().""" 949 950 def do_create(self, dir=None, pre="", suf="", delete=True): 951 if dir is None: 952 dir = tempfile.gettempdir() 953 file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf, 954 delete=delete) 955 956 self.nameCheck(file.name, dir, pre, suf) 957 return file 958 959 960 def test_basic(self): 961 # NamedTemporaryFile can create files 962 self.do_create() 963 self.do_create(pre="a") 964 self.do_create(suf="b") 965 self.do_create(pre="a", suf="b") 966 self.do_create(pre="aa", suf=".txt") 967 968 def test_method_lookup(self): 969 # Issue #18879: Looking up a temporary file method should keep it 970 # alive long enough. 971 f = self.do_create() 972 wr = weakref.ref(f) 973 write = f.write 974 write2 = f.write 975 del f 976 write(b'foo') 977 del write 978 write2(b'bar') 979 del write2 980 if support.check_impl_detail(cpython=True): 981 # No reference cycle was created. 982 self.assertIsNone(wr()) 983 984 def test_iter(self): 985 # Issue #23700: getting iterator from a temporary file should keep 986 # it alive as long as it's being iterated over 987 lines = [b'spam\n', b'eggs\n', b'beans\n'] 988 def make_file(): 989 f = tempfile.NamedTemporaryFile(mode='w+b') 990 f.write(b''.join(lines)) 991 f.seek(0) 992 return f 993 for i, l in enumerate(make_file()): 994 self.assertEqual(l, lines[i]) 995 self.assertEqual(i, len(lines) - 1) 996 997 def test_creates_named(self): 998 # NamedTemporaryFile creates files with names 999 f = tempfile.NamedTemporaryFile() 1000 self.assertTrue(os.path.exists(f.name), 1001 "NamedTemporaryFile %s does not exist" % f.name) 1002 1003 def test_del_on_close(self): 1004 # A NamedTemporaryFile is deleted when closed 1005 dir = tempfile.mkdtemp() 1006 try: 1007 with tempfile.NamedTemporaryFile(dir=dir) as f: 1008 f.write(b'blat') 1009 self.assertEqual(os.listdir(dir), []) 1010 self.assertFalse(os.path.exists(f.name), 1011 "NamedTemporaryFile %s exists after close" % f.name) 1012 finally: 1013 os.rmdir(dir) 1014 1015 def test_dis_del_on_close(self): 1016 # Tests that delete-on-close can be disabled 1017 dir = tempfile.mkdtemp() 1018 tmp = None 1019 try: 1020 f = tempfile.NamedTemporaryFile(dir=dir, delete=False) 1021 tmp = f.name 1022 f.write(b'blat') 1023 f.close() 1024 self.assertTrue(os.path.exists(f.name), 1025 "NamedTemporaryFile %s missing after close" % f.name) 1026 finally: 1027 if tmp is not None: 1028 os.unlink(tmp) 1029 os.rmdir(dir) 1030 1031 def test_multiple_close(self): 1032 # A NamedTemporaryFile can be closed many times without error 1033 f = tempfile.NamedTemporaryFile() 1034 f.write(b'abc\n') 1035 f.close() 1036 f.close() 1037 f.close() 1038 1039 def test_context_manager(self): 1040 # A NamedTemporaryFile can be used as a context manager 1041 with tempfile.NamedTemporaryFile() as f: 1042 self.assertTrue(os.path.exists(f.name)) 1043 self.assertFalse(os.path.exists(f.name)) 1044 def use_closed(): 1045 with f: 1046 pass 1047 self.assertRaises(ValueError, use_closed) 1048 1049 def test_context_man_not_del_on_close_if_delete_on_close_false(self): 1050 # Issue gh-58451: tempfile.NamedTemporaryFile is not particularly useful 1051 # on Windows 1052 # A NamedTemporaryFile is NOT deleted when closed if 1053 # delete_on_close=False, but is deleted on context manager exit 1054 dir = tempfile.mkdtemp() 1055 try: 1056 with tempfile.NamedTemporaryFile(dir=dir, 1057 delete=True, 1058 delete_on_close=False) as f: 1059 f.write(b'blat') 1060 f_name = f.name 1061 f.close() 1062 with self.subTest(): 1063 # Testing that file is not deleted on close 1064 self.assertTrue(os.path.exists(f.name), 1065 f"NamedTemporaryFile {f.name!r} is incorrectly " 1066 f"deleted on closure when delete_on_close=False") 1067 1068 with self.subTest(): 1069 # Testing that file is deleted on context manager exit 1070 self.assertFalse(os.path.exists(f.name), 1071 f"NamedTemporaryFile {f.name!r} exists " 1072 f"after context manager exit") 1073 1074 finally: 1075 os.rmdir(dir) 1076 1077 def test_context_man_ok_to_delete_manually(self): 1078 # In the case of delete=True, a NamedTemporaryFile can be manually 1079 # deleted in a with-statement context without causing an error. 1080 dir = tempfile.mkdtemp() 1081 try: 1082 with tempfile.NamedTemporaryFile(dir=dir, 1083 delete=True, 1084 delete_on_close=False) as f: 1085 f.write(b'blat') 1086 f.close() 1087 os.unlink(f.name) 1088 1089 finally: 1090 os.rmdir(dir) 1091 1092 def test_context_man_not_del_if_delete_false(self): 1093 # A NamedTemporaryFile is not deleted if delete = False 1094 dir = tempfile.mkdtemp() 1095 f_name = "" 1096 try: 1097 # Test that delete_on_close=True has no effect if delete=False. 1098 with tempfile.NamedTemporaryFile(dir=dir, delete=False, 1099 delete_on_close=True) as f: 1100 f.write(b'blat') 1101 f_name = f.name 1102 self.assertTrue(os.path.exists(f.name), 1103 f"NamedTemporaryFile {f.name!r} exists after close") 1104 finally: 1105 os.unlink(f_name) 1106 os.rmdir(dir) 1107 1108 def test_del_by_finalizer(self): 1109 # A NamedTemporaryFile is deleted when finalized in the case of 1110 # delete=True, delete_on_close=False, and no with-statement is used. 1111 def my_func(dir): 1112 f = tempfile.NamedTemporaryFile(dir=dir, delete=True, 1113 delete_on_close=False) 1114 tmp_name = f.name 1115 f.write(b'blat') 1116 # Testing extreme case, where the file is not explicitly closed 1117 # f.close() 1118 return tmp_name 1119 # Make sure that the garbage collector has finalized the file object. 1120 gc.collect() 1121 dir = tempfile.mkdtemp() 1122 try: 1123 tmp_name = my_func(dir) 1124 self.assertFalse(os.path.exists(tmp_name), 1125 f"NamedTemporaryFile {tmp_name!r} " 1126 f"exists after finalizer ") 1127 finally: 1128 os.rmdir(dir) 1129 1130 def test_correct_finalizer_work_if_already_deleted(self): 1131 # There should be no error in the case of delete=True, 1132 # delete_on_close=False, no with-statement is used, and the file is 1133 # deleted manually. 1134 def my_func(dir)->str: 1135 f = tempfile.NamedTemporaryFile(dir=dir, delete=True, 1136 delete_on_close=False) 1137 tmp_name = f.name 1138 f.write(b'blat') 1139 f.close() 1140 os.unlink(tmp_name) 1141 return tmp_name 1142 # Make sure that the garbage collector has finalized the file object. 1143 gc.collect() 1144 1145 def test_bad_mode(self): 1146 dir = tempfile.mkdtemp() 1147 self.addCleanup(os_helper.rmtree, dir) 1148 with self.assertRaises(ValueError): 1149 tempfile.NamedTemporaryFile(mode='wr', dir=dir) 1150 with self.assertRaises(TypeError): 1151 tempfile.NamedTemporaryFile(mode=2, dir=dir) 1152 self.assertEqual(os.listdir(dir), []) 1153 1154 def test_bad_encoding(self): 1155 dir = tempfile.mkdtemp() 1156 self.addCleanup(os_helper.rmtree, dir) 1157 with self.assertRaises(LookupError): 1158 tempfile.NamedTemporaryFile('w', encoding='bad-encoding', dir=dir) 1159 self.assertEqual(os.listdir(dir), []) 1160 1161 def test_unexpected_error(self): 1162 dir = tempfile.mkdtemp() 1163 self.addCleanup(os_helper.rmtree, dir) 1164 with mock.patch('tempfile._TemporaryFileWrapper') as mock_ntf, \ 1165 mock.patch('io.open', mock.mock_open()) as mock_open: 1166 mock_ntf.side_effect = KeyboardInterrupt() 1167 with self.assertRaises(KeyboardInterrupt): 1168 tempfile.NamedTemporaryFile(dir=dir) 1169 mock_open().close.assert_called() 1170 self.assertEqual(os.listdir(dir), []) 1171 1172 # How to test the mode and bufsize parameters? 1173 1174class TestSpooledTemporaryFile(BaseTestCase): 1175 """Test SpooledTemporaryFile().""" 1176 1177 def do_create(self, max_size=0, dir=None, pre="", suf=""): 1178 if dir is None: 1179 dir = tempfile.gettempdir() 1180 file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf) 1181 1182 return file 1183 1184 1185 def test_basic(self): 1186 # SpooledTemporaryFile can create files 1187 f = self.do_create() 1188 self.assertFalse(f._rolled) 1189 f = self.do_create(max_size=100, pre="a", suf=".txt") 1190 self.assertFalse(f._rolled) 1191 1192 def test_is_iobase(self): 1193 # SpooledTemporaryFile should implement io.IOBase 1194 self.assertIsInstance(self.do_create(), io.IOBase) 1195 1196 def test_iobase_interface(self): 1197 # SpooledTemporaryFile should implement the io.IOBase interface. 1198 # Ensure it has all the required methods and properties. 1199 iobase_attrs = { 1200 # From IOBase 1201 'fileno', 'seek', 'truncate', 'close', 'closed', '__enter__', 1202 '__exit__', 'flush', 'isatty', '__iter__', '__next__', 'readable', 1203 'readline', 'readlines', 'seekable', 'tell', 'writable', 1204 'writelines', 1205 # From BufferedIOBase (binary mode) and TextIOBase (text mode) 1206 'detach', 'read', 'read1', 'write', 'readinto', 'readinto1', 1207 'encoding', 'errors', 'newlines', 1208 } 1209 spooledtempfile_attrs = set(dir(tempfile.SpooledTemporaryFile)) 1210 missing_attrs = iobase_attrs - spooledtempfile_attrs 1211 self.assertFalse( 1212 missing_attrs, 1213 'SpooledTemporaryFile missing attributes from ' 1214 'IOBase/BufferedIOBase/TextIOBase' 1215 ) 1216 1217 def test_del_on_close(self): 1218 # A SpooledTemporaryFile is deleted when closed 1219 dir = tempfile.mkdtemp() 1220 try: 1221 f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir) 1222 self.assertFalse(f._rolled) 1223 f.write(b'blat ' * 5) 1224 self.assertTrue(f._rolled) 1225 filename = f.name 1226 f.close() 1227 self.assertEqual(os.listdir(dir), []) 1228 if not isinstance(filename, int): 1229 self.assertFalse(os.path.exists(filename), 1230 "SpooledTemporaryFile %s exists after close" % filename) 1231 finally: 1232 os.rmdir(dir) 1233 1234 def test_del_unrolled_file(self): 1235 # The unrolled SpooledTemporaryFile should raise a ResourceWarning 1236 # when deleted since the file was not explicitly closed. 1237 f = self.do_create(max_size=10) 1238 f.write(b'foo') 1239 self.assertEqual(f.name, None) # Unrolled so no filename/fd 1240 with self.assertWarns(ResourceWarning): 1241 f.__del__() 1242 1243 @unittest.skipIf( 1244 support.is_emscripten, "Emscripten cannot fstat renamed files." 1245 ) 1246 def test_del_rolled_file(self): 1247 # The rolled file should be deleted when the SpooledTemporaryFile 1248 # object is deleted. This should raise a ResourceWarning since the file 1249 # was not explicitly closed. 1250 f = self.do_create(max_size=2) 1251 f.write(b'foo') 1252 name = f.name # This is a fd on posix+cygwin, a filename everywhere else 1253 self.assertTrue(os.path.exists(name)) 1254 with self.assertWarns(ResourceWarning): 1255 f.__del__() 1256 self.assertFalse( 1257 os.path.exists(name), 1258 "Rolled SpooledTemporaryFile (name=%s) exists after delete" % name 1259 ) 1260 1261 def test_rewrite_small(self): 1262 # A SpooledTemporaryFile can be written to multiple within the max_size 1263 f = self.do_create(max_size=30) 1264 self.assertFalse(f._rolled) 1265 for i in range(5): 1266 f.seek(0, 0) 1267 f.write(b'x' * 20) 1268 self.assertFalse(f._rolled) 1269 1270 def test_write_sequential(self): 1271 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1272 # over afterward 1273 f = self.do_create(max_size=30) 1274 self.assertFalse(f._rolled) 1275 f.write(b'x' * 20) 1276 self.assertFalse(f._rolled) 1277 f.write(b'x' * 10) 1278 self.assertFalse(f._rolled) 1279 f.write(b'x') 1280 self.assertTrue(f._rolled) 1281 1282 def test_writelines(self): 1283 # Verify writelines with a SpooledTemporaryFile 1284 f = self.do_create() 1285 f.writelines((b'x', b'y', b'z')) 1286 pos = f.seek(0) 1287 self.assertEqual(pos, 0) 1288 buf = f.read() 1289 self.assertEqual(buf, b'xyz') 1290 1291 def test_writelines_sequential(self): 1292 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1293 # over afterward 1294 f = self.do_create(max_size=35) 1295 f.writelines((b'x' * 20, b'x' * 10, b'x' * 5)) 1296 self.assertFalse(f._rolled) 1297 f.write(b'x') 1298 self.assertTrue(f._rolled) 1299 1300 def test_sparse(self): 1301 # A SpooledTemporaryFile that is written late in the file will extend 1302 # when that occurs 1303 f = self.do_create(max_size=30) 1304 self.assertFalse(f._rolled) 1305 pos = f.seek(100, 0) 1306 self.assertEqual(pos, 100) 1307 self.assertFalse(f._rolled) 1308 f.write(b'x') 1309 self.assertTrue(f._rolled) 1310 1311 def test_fileno(self): 1312 # A SpooledTemporaryFile should roll over to a real file on fileno() 1313 f = self.do_create(max_size=30) 1314 self.assertFalse(f._rolled) 1315 self.assertTrue(f.fileno() > 0) 1316 self.assertTrue(f._rolled) 1317 1318 def test_multiple_close_before_rollover(self): 1319 # A SpooledTemporaryFile can be closed many times without error 1320 f = tempfile.SpooledTemporaryFile() 1321 f.write(b'abc\n') 1322 self.assertFalse(f._rolled) 1323 f.close() 1324 f.close() 1325 f.close() 1326 1327 def test_multiple_close_after_rollover(self): 1328 # A SpooledTemporaryFile can be closed many times without error 1329 f = tempfile.SpooledTemporaryFile(max_size=1) 1330 f.write(b'abc\n') 1331 self.assertTrue(f._rolled) 1332 f.close() 1333 f.close() 1334 f.close() 1335 1336 def test_bound_methods(self): 1337 # It should be OK to steal a bound method from a SpooledTemporaryFile 1338 # and use it independently; when the file rolls over, those bound 1339 # methods should continue to function 1340 f = self.do_create(max_size=30) 1341 read = f.read 1342 write = f.write 1343 seek = f.seek 1344 1345 write(b"a" * 35) 1346 write(b"b" * 35) 1347 seek(0, 0) 1348 self.assertEqual(read(70), b'a'*35 + b'b'*35) 1349 1350 def test_properties(self): 1351 f = tempfile.SpooledTemporaryFile(max_size=10) 1352 f.write(b'x' * 10) 1353 self.assertFalse(f._rolled) 1354 self.assertEqual(f.mode, 'w+b') 1355 self.assertIsNone(f.name) 1356 with self.assertRaises(AttributeError): 1357 f.newlines 1358 with self.assertRaises(AttributeError): 1359 f.encoding 1360 with self.assertRaises(AttributeError): 1361 f.errors 1362 1363 f.write(b'x') 1364 self.assertTrue(f._rolled) 1365 self.assertEqual(f.mode, 'rb+') 1366 self.assertIsNotNone(f.name) 1367 with self.assertRaises(AttributeError): 1368 f.newlines 1369 with self.assertRaises(AttributeError): 1370 f.encoding 1371 with self.assertRaises(AttributeError): 1372 f.errors 1373 1374 def test_text_mode(self): 1375 # Creating a SpooledTemporaryFile with a text mode should produce 1376 # a file object reading and writing (Unicode) text strings. 1377 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 1378 encoding="utf-8") 1379 f.write("abc\n") 1380 f.seek(0) 1381 self.assertEqual(f.read(), "abc\n") 1382 f.write("def\n") 1383 f.seek(0) 1384 self.assertEqual(f.read(), "abc\ndef\n") 1385 self.assertFalse(f._rolled) 1386 self.assertEqual(f.mode, 'w+') 1387 self.assertIsNone(f.name) 1388 self.assertEqual(f.newlines, os.linesep) 1389 self.assertEqual(f.encoding, "utf-8") 1390 self.assertEqual(f.errors, "strict") 1391 1392 f.write("xyzzy\n") 1393 f.seek(0) 1394 self.assertEqual(f.read(), "abc\ndef\nxyzzy\n") 1395 # Check that Ctrl+Z doesn't truncate the file 1396 f.write("foo\x1abar\n") 1397 f.seek(0) 1398 self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n") 1399 self.assertTrue(f._rolled) 1400 self.assertEqual(f.mode, 'w+') 1401 self.assertIsNotNone(f.name) 1402 self.assertEqual(f.newlines, os.linesep) 1403 self.assertEqual(f.encoding, "utf-8") 1404 self.assertEqual(f.errors, "strict") 1405 1406 def test_text_newline_and_encoding(self): 1407 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 1408 newline='', encoding='utf-8', 1409 errors='ignore') 1410 f.write("\u039B\r\n") 1411 f.seek(0) 1412 self.assertEqual(f.read(), "\u039B\r\n") 1413 self.assertFalse(f._rolled) 1414 self.assertEqual(f.mode, 'w+') 1415 self.assertIsNone(f.name) 1416 self.assertIsNotNone(f.newlines) 1417 self.assertEqual(f.encoding, "utf-8") 1418 self.assertEqual(f.errors, "ignore") 1419 1420 f.write("\u039C" * 10 + "\r\n") 1421 f.write("\u039D" * 20) 1422 f.seek(0) 1423 self.assertEqual(f.read(), 1424 "\u039B\r\n" + ("\u039C" * 10) + "\r\n" + ("\u039D" * 20)) 1425 self.assertTrue(f._rolled) 1426 self.assertEqual(f.mode, 'w+') 1427 self.assertIsNotNone(f.name) 1428 self.assertIsNotNone(f.newlines) 1429 self.assertEqual(f.encoding, 'utf-8') 1430 self.assertEqual(f.errors, 'ignore') 1431 1432 def test_context_manager_before_rollover(self): 1433 # A SpooledTemporaryFile can be used as a context manager 1434 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1435 self.assertFalse(f._rolled) 1436 self.assertFalse(f.closed) 1437 self.assertTrue(f.closed) 1438 def use_closed(): 1439 with f: 1440 pass 1441 self.assertRaises(ValueError, use_closed) 1442 1443 def test_context_manager_during_rollover(self): 1444 # A SpooledTemporaryFile can be used as a context manager 1445 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1446 self.assertFalse(f._rolled) 1447 f.write(b'abc\n') 1448 f.flush() 1449 self.assertTrue(f._rolled) 1450 self.assertFalse(f.closed) 1451 self.assertTrue(f.closed) 1452 def use_closed(): 1453 with f: 1454 pass 1455 self.assertRaises(ValueError, use_closed) 1456 1457 def test_context_manager_after_rollover(self): 1458 # A SpooledTemporaryFile can be used as a context manager 1459 f = tempfile.SpooledTemporaryFile(max_size=1) 1460 f.write(b'abc\n') 1461 f.flush() 1462 self.assertTrue(f._rolled) 1463 with f: 1464 self.assertFalse(f.closed) 1465 self.assertTrue(f.closed) 1466 def use_closed(): 1467 with f: 1468 pass 1469 self.assertRaises(ValueError, use_closed) 1470 1471 @unittest.skipIf( 1472 support.is_emscripten, "Emscripten cannot fstat renamed files." 1473 ) 1474 def test_truncate_with_size_parameter(self): 1475 # A SpooledTemporaryFile can be truncated to zero size 1476 f = tempfile.SpooledTemporaryFile(max_size=10) 1477 f.write(b'abcdefg\n') 1478 f.seek(0) 1479 f.truncate() 1480 self.assertFalse(f._rolled) 1481 self.assertEqual(f._file.getvalue(), b'') 1482 # A SpooledTemporaryFile can be truncated to a specific size 1483 f = tempfile.SpooledTemporaryFile(max_size=10) 1484 f.write(b'abcdefg\n') 1485 f.truncate(4) 1486 self.assertFalse(f._rolled) 1487 self.assertEqual(f._file.getvalue(), b'abcd') 1488 # A SpooledTemporaryFile rolls over if truncated to large size 1489 f = tempfile.SpooledTemporaryFile(max_size=10) 1490 f.write(b'abcdefg\n') 1491 f.truncate(20) 1492 self.assertTrue(f._rolled) 1493 self.assertEqual(os.fstat(f.fileno()).st_size, 20) 1494 1495 def test_class_getitem(self): 1496 self.assertIsInstance(tempfile.SpooledTemporaryFile[bytes], 1497 types.GenericAlias) 1498 1499if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile: 1500 1501 class TestTemporaryFile(BaseTestCase): 1502 """Test TemporaryFile().""" 1503 1504 def test_basic(self): 1505 # TemporaryFile can create files 1506 # No point in testing the name params - the file has no name. 1507 tempfile.TemporaryFile() 1508 1509 def test_has_no_name(self): 1510 # TemporaryFile creates files with no names (on this system) 1511 dir = tempfile.mkdtemp() 1512 f = tempfile.TemporaryFile(dir=dir) 1513 f.write(b'blat') 1514 1515 # Sneaky: because this file has no name, it should not prevent 1516 # us from removing the directory it was created in. 1517 try: 1518 os.rmdir(dir) 1519 except: 1520 # cleanup 1521 f.close() 1522 os.rmdir(dir) 1523 raise 1524 1525 def test_multiple_close(self): 1526 # A TemporaryFile can be closed many times without error 1527 f = tempfile.TemporaryFile() 1528 f.write(b'abc\n') 1529 f.close() 1530 f.close() 1531 f.close() 1532 1533 # How to test the mode and bufsize parameters? 1534 def test_mode_and_encoding(self): 1535 1536 def roundtrip(input, *args, **kwargs): 1537 with tempfile.TemporaryFile(*args, **kwargs) as fileobj: 1538 fileobj.write(input) 1539 fileobj.seek(0) 1540 self.assertEqual(input, fileobj.read()) 1541 1542 roundtrip(b"1234", "w+b") 1543 roundtrip("abdc\n", "w+") 1544 roundtrip("\u039B", "w+", encoding="utf-16") 1545 roundtrip("foo\r\n", "w+", newline="") 1546 1547 def test_bad_mode(self): 1548 dir = tempfile.mkdtemp() 1549 self.addCleanup(os_helper.rmtree, dir) 1550 with self.assertRaises(ValueError): 1551 tempfile.TemporaryFile(mode='wr', dir=dir) 1552 with self.assertRaises(TypeError): 1553 tempfile.TemporaryFile(mode=2, dir=dir) 1554 self.assertEqual(os.listdir(dir), []) 1555 1556 def test_bad_encoding(self): 1557 dir = tempfile.mkdtemp() 1558 self.addCleanup(os_helper.rmtree, dir) 1559 with self.assertRaises(LookupError): 1560 tempfile.TemporaryFile('w', encoding='bad-encoding', dir=dir) 1561 self.assertEqual(os.listdir(dir), []) 1562 1563 def test_unexpected_error(self): 1564 dir = tempfile.mkdtemp() 1565 self.addCleanup(os_helper.rmtree, dir) 1566 with mock.patch('tempfile._O_TMPFILE_WORKS', False), \ 1567 mock.patch('os.unlink') as mock_unlink, \ 1568 mock.patch('os.open') as mock_open, \ 1569 mock.patch('os.close') as mock_close: 1570 mock_unlink.side_effect = KeyboardInterrupt() 1571 with self.assertRaises(KeyboardInterrupt): 1572 tempfile.TemporaryFile(dir=dir) 1573 mock_close.assert_called() 1574 self.assertEqual(os.listdir(dir), []) 1575 1576 1577# Helper for test_del_on_shutdown 1578class NulledModules: 1579 def __init__(self, *modules): 1580 self.refs = [mod.__dict__ for mod in modules] 1581 self.contents = [ref.copy() for ref in self.refs] 1582 1583 def __enter__(self): 1584 for d in self.refs: 1585 for key in d: 1586 d[key] = None 1587 1588 def __exit__(self, *exc_info): 1589 for d, c in zip(self.refs, self.contents): 1590 d.clear() 1591 d.update(c) 1592 1593 1594class TestTemporaryDirectory(BaseTestCase): 1595 """Test TemporaryDirectory().""" 1596 1597 def do_create(self, dir=None, pre="", suf="", recurse=1, dirs=1, files=1, 1598 ignore_cleanup_errors=False): 1599 if dir is None: 1600 dir = tempfile.gettempdir() 1601 tmp = tempfile.TemporaryDirectory( 1602 dir=dir, prefix=pre, suffix=suf, 1603 ignore_cleanup_errors=ignore_cleanup_errors) 1604 self.nameCheck(tmp.name, dir, pre, suf) 1605 self.do_create2(tmp.name, recurse, dirs, files) 1606 return tmp 1607 1608 def do_create2(self, path, recurse=1, dirs=1, files=1): 1609 # Create subdirectories and some files 1610 if recurse: 1611 for i in range(dirs): 1612 name = os.path.join(path, "dir%d" % i) 1613 os.mkdir(name) 1614 self.do_create2(name, recurse-1, dirs, files) 1615 for i in range(files): 1616 with open(os.path.join(path, "test%d.txt" % i), "wb") as f: 1617 f.write(b"Hello world!") 1618 1619 def test_mkdtemp_failure(self): 1620 # Check no additional exception if mkdtemp fails 1621 # Previously would raise AttributeError instead 1622 # (noted as part of Issue #10188) 1623 with tempfile.TemporaryDirectory() as nonexistent: 1624 pass 1625 with self.assertRaises(FileNotFoundError) as cm: 1626 tempfile.TemporaryDirectory(dir=nonexistent) 1627 self.assertEqual(cm.exception.errno, errno.ENOENT) 1628 1629 def test_explicit_cleanup(self): 1630 # A TemporaryDirectory is deleted when cleaned up 1631 dir = tempfile.mkdtemp() 1632 try: 1633 d = self.do_create(dir=dir) 1634 self.assertTrue(os.path.exists(d.name), 1635 "TemporaryDirectory %s does not exist" % d.name) 1636 d.cleanup() 1637 self.assertFalse(os.path.exists(d.name), 1638 "TemporaryDirectory %s exists after cleanup" % d.name) 1639 finally: 1640 os.rmdir(dir) 1641 1642 def test_explicit_cleanup_ignore_errors(self): 1643 """Test that cleanup doesn't return an error when ignoring them.""" 1644 with tempfile.TemporaryDirectory() as working_dir: 1645 temp_dir = self.do_create( 1646 dir=working_dir, ignore_cleanup_errors=True) 1647 temp_path = pathlib.Path(temp_dir.name) 1648 self.assertTrue(temp_path.exists(), 1649 f"TemporaryDirectory {temp_path!s} does not exist") 1650 with open(temp_path / "a_file.txt", "w+t") as open_file: 1651 open_file.write("Hello world!\n") 1652 temp_dir.cleanup() 1653 self.assertEqual(len(list(temp_path.glob("*"))), 1654 int(sys.platform.startswith("win")), 1655 "Unexpected number of files in " 1656 f"TemporaryDirectory {temp_path!s}") 1657 self.assertEqual( 1658 temp_path.exists(), 1659 sys.platform.startswith("win"), 1660 f"TemporaryDirectory {temp_path!s} existence state unexpected") 1661 temp_dir.cleanup() 1662 self.assertFalse( 1663 temp_path.exists(), 1664 f"TemporaryDirectory {temp_path!s} exists after cleanup") 1665 1666 @unittest.skipUnless(os.name == "nt", "Only on Windows.") 1667 def test_explicit_cleanup_correct_error(self): 1668 with tempfile.TemporaryDirectory() as working_dir: 1669 temp_dir = self.do_create(dir=working_dir) 1670 with open(os.path.join(temp_dir.name, "example.txt"), 'wb'): 1671 # Previously raised NotADirectoryError on some OSes 1672 # (e.g. Windows). See bpo-43153. 1673 with self.assertRaises(PermissionError): 1674 temp_dir.cleanup() 1675 1676 @unittest.skipUnless(os.name == "nt", "Only on Windows.") 1677 def test_cleanup_with_used_directory(self): 1678 with tempfile.TemporaryDirectory() as working_dir: 1679 temp_dir = self.do_create(dir=working_dir) 1680 subdir = os.path.join(temp_dir.name, "subdir") 1681 os.mkdir(subdir) 1682 with os_helper.change_cwd(subdir): 1683 # Previously raised RecursionError on some OSes 1684 # (e.g. Windows). See bpo-35144. 1685 with self.assertRaises(PermissionError): 1686 temp_dir.cleanup() 1687 1688 @os_helper.skip_unless_symlink 1689 def test_cleanup_with_symlink_to_a_directory(self): 1690 # cleanup() should not follow symlinks to directories (issue #12464) 1691 d1 = self.do_create() 1692 d2 = self.do_create(recurse=0) 1693 1694 # Symlink d1/foo -> d2 1695 os.symlink(d2.name, os.path.join(d1.name, "foo")) 1696 1697 # This call to cleanup() should not follow the "foo" symlink 1698 d1.cleanup() 1699 1700 self.assertFalse(os.path.exists(d1.name), 1701 "TemporaryDirectory %s exists after cleanup" % d1.name) 1702 self.assertTrue(os.path.exists(d2.name), 1703 "Directory pointed to by a symlink was deleted") 1704 self.assertEqual(os.listdir(d2.name), ['test0.txt'], 1705 "Contents of the directory pointed to by a symlink " 1706 "were deleted") 1707 d2.cleanup() 1708 1709 @os_helper.skip_unless_symlink 1710 def test_cleanup_with_symlink_modes(self): 1711 # cleanup() should not follow symlinks when fixing mode bits (#91133) 1712 with self.do_create(recurse=0) as d2: 1713 file1 = os.path.join(d2, 'file1') 1714 open(file1, 'wb').close() 1715 dir1 = os.path.join(d2, 'dir1') 1716 os.mkdir(dir1) 1717 for mode in range(8): 1718 mode <<= 6 1719 with self.subTest(mode=format(mode, '03o')): 1720 def test(target, target_is_directory): 1721 d1 = self.do_create(recurse=0) 1722 symlink = os.path.join(d1.name, 'symlink') 1723 os.symlink(target, symlink, 1724 target_is_directory=target_is_directory) 1725 try: 1726 os.chmod(symlink, mode, follow_symlinks=False) 1727 except NotImplementedError: 1728 pass 1729 try: 1730 os.chmod(symlink, mode) 1731 except FileNotFoundError: 1732 pass 1733 os.chmod(d1.name, mode) 1734 d1.cleanup() 1735 self.assertFalse(os.path.exists(d1.name)) 1736 1737 with self.subTest('nonexisting file'): 1738 test('nonexisting', target_is_directory=False) 1739 with self.subTest('nonexisting dir'): 1740 test('nonexisting', target_is_directory=True) 1741 1742 with self.subTest('existing file'): 1743 os.chmod(file1, mode) 1744 old_mode = os.stat(file1).st_mode 1745 test(file1, target_is_directory=False) 1746 new_mode = os.stat(file1).st_mode 1747 self.assertEqual(new_mode, old_mode, 1748 '%03o != %03o' % (new_mode, old_mode)) 1749 1750 with self.subTest('existing dir'): 1751 os.chmod(dir1, mode) 1752 old_mode = os.stat(dir1).st_mode 1753 test(dir1, target_is_directory=True) 1754 new_mode = os.stat(dir1).st_mode 1755 self.assertEqual(new_mode, old_mode, 1756 '%03o != %03o' % (new_mode, old_mode)) 1757 1758 @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags') 1759 @os_helper.skip_unless_symlink 1760 def test_cleanup_with_symlink_flags(self): 1761 # cleanup() should not follow symlinks when fixing flags (#91133) 1762 flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK 1763 self.check_flags(flags) 1764 1765 with self.do_create(recurse=0) as d2: 1766 file1 = os.path.join(d2, 'file1') 1767 open(file1, 'wb').close() 1768 dir1 = os.path.join(d2, 'dir1') 1769 os.mkdir(dir1) 1770 def test(target, target_is_directory): 1771 d1 = self.do_create(recurse=0) 1772 symlink = os.path.join(d1.name, 'symlink') 1773 os.symlink(target, symlink, 1774 target_is_directory=target_is_directory) 1775 try: 1776 os.chflags(symlink, flags, follow_symlinks=False) 1777 except NotImplementedError: 1778 pass 1779 try: 1780 os.chflags(symlink, flags) 1781 except FileNotFoundError: 1782 pass 1783 os.chflags(d1.name, flags) 1784 d1.cleanup() 1785 self.assertFalse(os.path.exists(d1.name)) 1786 1787 with self.subTest('nonexisting file'): 1788 test('nonexisting', target_is_directory=False) 1789 with self.subTest('nonexisting dir'): 1790 test('nonexisting', target_is_directory=True) 1791 1792 with self.subTest('existing file'): 1793 os.chflags(file1, flags) 1794 old_flags = os.stat(file1).st_flags 1795 test(file1, target_is_directory=False) 1796 new_flags = os.stat(file1).st_flags 1797 self.assertEqual(new_flags, old_flags) 1798 1799 with self.subTest('existing dir'): 1800 os.chflags(dir1, flags) 1801 old_flags = os.stat(dir1).st_flags 1802 test(dir1, target_is_directory=True) 1803 new_flags = os.stat(dir1).st_flags 1804 self.assertEqual(new_flags, old_flags) 1805 1806 @support.cpython_only 1807 def test_del_on_collection(self): 1808 # A TemporaryDirectory is deleted when garbage collected 1809 dir = tempfile.mkdtemp() 1810 try: 1811 d = self.do_create(dir=dir) 1812 name = d.name 1813 del d # Rely on refcounting to invoke __del__ 1814 self.assertFalse(os.path.exists(name), 1815 "TemporaryDirectory %s exists after __del__" % name) 1816 finally: 1817 os.rmdir(dir) 1818 1819 @support.cpython_only 1820 def test_del_on_collection_ignore_errors(self): 1821 """Test that ignoring errors works when TemporaryDirectory is gced.""" 1822 with tempfile.TemporaryDirectory() as working_dir: 1823 temp_dir = self.do_create( 1824 dir=working_dir, ignore_cleanup_errors=True) 1825 temp_path = pathlib.Path(temp_dir.name) 1826 self.assertTrue(temp_path.exists(), 1827 f"TemporaryDirectory {temp_path!s} does not exist") 1828 with open(temp_path / "a_file.txt", "w+t") as open_file: 1829 open_file.write("Hello world!\n") 1830 del temp_dir 1831 self.assertEqual(len(list(temp_path.glob("*"))), 1832 int(sys.platform.startswith("win")), 1833 "Unexpected number of files in " 1834 f"TemporaryDirectory {temp_path!s}") 1835 self.assertEqual( 1836 temp_path.exists(), 1837 sys.platform.startswith("win"), 1838 f"TemporaryDirectory {temp_path!s} existence state unexpected") 1839 1840 def test_del_on_shutdown(self): 1841 # A TemporaryDirectory may be cleaned up during shutdown 1842 with self.do_create() as dir: 1843 for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'): 1844 code = """if True: 1845 import builtins 1846 import os 1847 import shutil 1848 import sys 1849 import tempfile 1850 import warnings 1851 1852 tmp = tempfile.TemporaryDirectory(dir={dir!r}) 1853 sys.stdout.buffer.write(tmp.name.encode()) 1854 1855 tmp2 = os.path.join(tmp.name, 'test_dir') 1856 os.mkdir(tmp2) 1857 with open(os.path.join(tmp2, "test0.txt"), "w") as f: 1858 f.write("Hello world!") 1859 1860 {mod}.tmp = tmp 1861 1862 warnings.filterwarnings("always", category=ResourceWarning) 1863 """.format(dir=dir, mod=mod) 1864 rc, out, err = script_helper.assert_python_ok("-c", code) 1865 tmp_name = out.decode().strip() 1866 self.assertFalse(os.path.exists(tmp_name), 1867 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1868 err = err.decode('utf-8', 'backslashreplace') 1869 self.assertNotIn("Exception ", err) 1870 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1871 1872 def test_del_on_shutdown_ignore_errors(self): 1873 """Test ignoring errors works when a tempdir is gc'ed on shutdown.""" 1874 with tempfile.TemporaryDirectory() as working_dir: 1875 code = """if True: 1876 import pathlib 1877 import sys 1878 import tempfile 1879 import warnings 1880 1881 temp_dir = tempfile.TemporaryDirectory( 1882 dir={working_dir!r}, ignore_cleanup_errors=True) 1883 sys.stdout.buffer.write(temp_dir.name.encode()) 1884 1885 temp_dir_2 = pathlib.Path(temp_dir.name) / "test_dir" 1886 temp_dir_2.mkdir() 1887 with open(temp_dir_2 / "test0.txt", "w") as test_file: 1888 test_file.write("Hello world!") 1889 open_file = open(temp_dir_2 / "open_file.txt", "w") 1890 open_file.write("Hello world!") 1891 1892 warnings.filterwarnings("always", category=ResourceWarning) 1893 """.format(working_dir=working_dir) 1894 __, out, err = script_helper.assert_python_ok("-c", code) 1895 temp_path = pathlib.Path(out.decode().strip()) 1896 self.assertEqual(len(list(temp_path.glob("*"))), 1897 int(sys.platform.startswith("win")), 1898 "Unexpected number of files in " 1899 f"TemporaryDirectory {temp_path!s}") 1900 self.assertEqual( 1901 temp_path.exists(), 1902 sys.platform.startswith("win"), 1903 f"TemporaryDirectory {temp_path!s} existence state unexpected") 1904 err = err.decode('utf-8', 'backslashreplace') 1905 self.assertNotIn("Exception", err) 1906 self.assertNotIn("Error", err) 1907 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1908 1909 def test_exit_on_shutdown(self): 1910 # Issue #22427 1911 with self.do_create() as dir: 1912 code = """if True: 1913 import sys 1914 import tempfile 1915 import warnings 1916 1917 def generator(): 1918 with tempfile.TemporaryDirectory(dir={dir!r}) as tmp: 1919 yield tmp 1920 g = generator() 1921 sys.stdout.buffer.write(next(g).encode()) 1922 1923 warnings.filterwarnings("always", category=ResourceWarning) 1924 """.format(dir=dir) 1925 rc, out, err = script_helper.assert_python_ok("-c", code) 1926 tmp_name = out.decode().strip() 1927 self.assertFalse(os.path.exists(tmp_name), 1928 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1929 err = err.decode('utf-8', 'backslashreplace') 1930 self.assertNotIn("Exception ", err) 1931 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1932 1933 def test_warnings_on_cleanup(self): 1934 # ResourceWarning will be triggered by __del__ 1935 with self.do_create() as dir: 1936 d = self.do_create(dir=dir, recurse=3) 1937 name = d.name 1938 1939 # Check for the resource warning 1940 with warnings_helper.check_warnings(('Implicitly', 1941 ResourceWarning), 1942 quiet=False): 1943 warnings.filterwarnings("always", category=ResourceWarning) 1944 del d 1945 support.gc_collect() 1946 self.assertFalse(os.path.exists(name), 1947 "TemporaryDirectory %s exists after __del__" % name) 1948 1949 def test_multiple_close(self): 1950 # Can be cleaned-up many times without error 1951 d = self.do_create() 1952 d.cleanup() 1953 d.cleanup() 1954 d.cleanup() 1955 1956 def test_context_manager(self): 1957 # Can be used as a context manager 1958 d = self.do_create() 1959 with d as name: 1960 self.assertTrue(os.path.exists(name)) 1961 self.assertEqual(name, d.name) 1962 self.assertFalse(os.path.exists(name)) 1963 1964 def test_modes(self): 1965 for mode in range(8): 1966 mode <<= 6 1967 with self.subTest(mode=format(mode, '03o')): 1968 d = self.do_create(recurse=3, dirs=2, files=2) 1969 with d: 1970 # Change files and directories mode recursively. 1971 for root, dirs, files in os.walk(d.name, topdown=False): 1972 for name in files: 1973 os.chmod(os.path.join(root, name), mode) 1974 os.chmod(root, mode) 1975 d.cleanup() 1976 self.assertFalse(os.path.exists(d.name)) 1977 1978 def check_flags(self, flags): 1979 # skip the test if these flags are not supported (ex: FreeBSD 13) 1980 filename = os_helper.TESTFN 1981 try: 1982 open(filename, "w").close() 1983 try: 1984 os.chflags(filename, flags) 1985 except OSError as exc: 1986 # "OSError: [Errno 45] Operation not supported" 1987 self.skipTest(f"chflags() doesn't support flags " 1988 f"{flags:#b}: {exc}") 1989 else: 1990 os.chflags(filename, 0) 1991 finally: 1992 os_helper.unlink(filename) 1993 1994 @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags') 1995 def test_flags(self): 1996 flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK 1997 self.check_flags(flags) 1998 1999 d = self.do_create(recurse=3, dirs=2, files=2) 2000 with d: 2001 # Change files and directories flags recursively. 2002 for root, dirs, files in os.walk(d.name, topdown=False): 2003 for name in files: 2004 os.chflags(os.path.join(root, name), flags) 2005 os.chflags(root, flags) 2006 d.cleanup() 2007 self.assertFalse(os.path.exists(d.name)) 2008 2009 def test_delete_false(self): 2010 with tempfile.TemporaryDirectory(delete=False) as working_dir: 2011 pass 2012 self.assertTrue(os.path.exists(working_dir)) 2013 shutil.rmtree(working_dir) 2014 2015if __name__ == "__main__": 2016 unittest.main() 2017