1# tempfile.py unit tests. 2import tempfile 3import errno 4import io 5import os 6import pathlib 7import sys 8import re 9import warnings 10import contextlib 11import stat 12import types 13import weakref 14import subprocess 15from unittest import mock 16 17import unittest 18from test import support 19from test.support import os_helper 20from test.support import script_helper 21from test.support import warnings_helper 22 23 24has_textmode = (tempfile._text_openflags != tempfile._bin_openflags) 25has_spawnl = hasattr(os, 'spawnl') 26 27# TEST_FILES may need to be tweaked for systems depending on the maximum 28# number of files that can be opened at one time (see ulimit -n) 29if sys.platform.startswith('openbsd'): 30 TEST_FILES = 48 31else: 32 TEST_FILES = 100 33 34# This is organized as one test for each chunk of code in tempfile.py, 35# in order of their appearance in the file. Testing which requires 36# threads is not done here. 37 38class TestLowLevelInternals(unittest.TestCase): 39 def test_infer_return_type_singles(self): 40 self.assertIs(str, tempfile._infer_return_type('')) 41 self.assertIs(bytes, tempfile._infer_return_type(b'')) 42 self.assertIs(str, tempfile._infer_return_type(None)) 43 44 def test_infer_return_type_multiples(self): 45 self.assertIs(str, tempfile._infer_return_type('', '')) 46 self.assertIs(bytes, tempfile._infer_return_type(b'', b'')) 47 with self.assertRaises(TypeError): 48 tempfile._infer_return_type('', b'') 49 with self.assertRaises(TypeError): 50 tempfile._infer_return_type(b'', '') 51 52 def test_infer_return_type_multiples_and_none(self): 53 self.assertIs(str, tempfile._infer_return_type(None, '')) 54 self.assertIs(str, tempfile._infer_return_type('', None)) 55 self.assertIs(str, tempfile._infer_return_type(None, None)) 56 self.assertIs(bytes, tempfile._infer_return_type(b'', None)) 57 self.assertIs(bytes, tempfile._infer_return_type(None, b'')) 58 with self.assertRaises(TypeError): 59 tempfile._infer_return_type('', None, b'') 60 with self.assertRaises(TypeError): 61 tempfile._infer_return_type(b'', None, '') 62 63 def test_infer_return_type_pathlib(self): 64 self.assertIs(str, tempfile._infer_return_type(pathlib.Path('/'))) 65 66 def test_infer_return_type_pathlike(self): 67 class Path: 68 def __init__(self, path): 69 self.path = path 70 71 def __fspath__(self): 72 return self.path 73 74 self.assertIs(str, tempfile._infer_return_type(Path('/'))) 75 self.assertIs(bytes, tempfile._infer_return_type(Path(b'/'))) 76 self.assertIs(str, tempfile._infer_return_type('', Path(''))) 77 self.assertIs(bytes, tempfile._infer_return_type(b'', Path(b''))) 78 self.assertIs(bytes, tempfile._infer_return_type(None, Path(b''))) 79 self.assertIs(str, tempfile._infer_return_type(None, Path(''))) 80 81 with self.assertRaises(TypeError): 82 tempfile._infer_return_type('', Path(b'')) 83 with self.assertRaises(TypeError): 84 tempfile._infer_return_type(b'', Path('')) 85 86# Common functionality. 87 88class BaseTestCase(unittest.TestCase): 89 90 str_check = re.compile(r"^[a-z0-9_-]{8}$") 91 b_check = re.compile(br"^[a-z0-9_-]{8}$") 92 93 def setUp(self): 94 self.enterContext(warnings_helper.check_warnings()) 95 warnings.filterwarnings("ignore", category=RuntimeWarning, 96 message="mktemp", module=__name__) 97 98 def nameCheck(self, name, dir, pre, suf): 99 (ndir, nbase) = os.path.split(name) 100 npre = nbase[:len(pre)] 101 nsuf = nbase[len(nbase)-len(suf):] 102 103 if dir is not None: 104 self.assertIs( 105 type(name), 106 str 107 if type(dir) is str or isinstance(dir, os.PathLike) else 108 bytes, 109 "unexpected return type", 110 ) 111 if pre is not None: 112 self.assertIs(type(name), str if type(pre) is str else bytes, 113 "unexpected return type") 114 if suf is not None: 115 self.assertIs(type(name), str if type(suf) is str else bytes, 116 "unexpected return type") 117 if (dir, pre, suf) == (None, None, None): 118 self.assertIs(type(name), str, "default return type must be str") 119 120 # check for equality of the absolute paths! 121 self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir), 122 "file %r not in directory %r" % (name, dir)) 123 self.assertEqual(npre, pre, 124 "file %r does not begin with %r" % (nbase, pre)) 125 self.assertEqual(nsuf, suf, 126 "file %r does not end with %r" % (nbase, suf)) 127 128 nbase = nbase[len(pre):len(nbase)-len(suf)] 129 check = self.str_check if isinstance(nbase, str) else self.b_check 130 self.assertTrue(check.match(nbase), 131 "random characters %r do not match %r" 132 % (nbase, check.pattern)) 133 134 135class TestExports(BaseTestCase): 136 def test_exports(self): 137 # There are no surprising symbols in the tempfile module 138 dict = tempfile.__dict__ 139 140 expected = { 141 "NamedTemporaryFile" : 1, 142 "TemporaryFile" : 1, 143 "mkstemp" : 1, 144 "mkdtemp" : 1, 145 "mktemp" : 1, 146 "TMP_MAX" : 1, 147 "gettempprefix" : 1, 148 "gettempprefixb" : 1, 149 "gettempdir" : 1, 150 "gettempdirb" : 1, 151 "tempdir" : 1, 152 "template" : 1, 153 "SpooledTemporaryFile" : 1, 154 "TemporaryDirectory" : 1, 155 } 156 157 unexp = [] 158 for key in dict: 159 if key[0] != '_' and key not in expected: 160 unexp.append(key) 161 self.assertTrue(len(unexp) == 0, 162 "unexpected keys: %s" % unexp) 163 164 165class TestRandomNameSequence(BaseTestCase): 166 """Test the internal iterator object _RandomNameSequence.""" 167 168 def setUp(self): 169 self.r = tempfile._RandomNameSequence() 170 super().setUp() 171 172 def test_get_eight_char_str(self): 173 # _RandomNameSequence returns a eight-character string 174 s = next(self.r) 175 self.nameCheck(s, '', '', '') 176 177 def test_many(self): 178 # _RandomNameSequence returns no duplicate strings (stochastic) 179 180 dict = {} 181 r = self.r 182 for i in range(TEST_FILES): 183 s = next(r) 184 self.nameCheck(s, '', '', '') 185 self.assertNotIn(s, dict) 186 dict[s] = 1 187 188 def supports_iter(self): 189 # _RandomNameSequence supports the iterator protocol 190 191 i = 0 192 r = self.r 193 for s in r: 194 i += 1 195 if i == 20: 196 break 197 198 @support.requires_fork() 199 def test_process_awareness(self): 200 # ensure that the random source differs between 201 # child and parent. 202 read_fd, write_fd = os.pipe() 203 pid = None 204 try: 205 pid = os.fork() 206 if not pid: 207 # child process 208 os.close(read_fd) 209 os.write(write_fd, next(self.r).encode("ascii")) 210 os.close(write_fd) 211 # bypass the normal exit handlers- leave those to 212 # the parent. 213 os._exit(0) 214 215 # parent process 216 parent_value = next(self.r) 217 child_value = os.read(read_fd, len(parent_value)).decode("ascii") 218 finally: 219 if pid: 220 support.wait_process(pid, exitcode=0) 221 222 os.close(read_fd) 223 os.close(write_fd) 224 self.assertNotEqual(child_value, parent_value) 225 226 227 228class TestCandidateTempdirList(BaseTestCase): 229 """Test the internal function _candidate_tempdir_list.""" 230 231 def test_nonempty_list(self): 232 # _candidate_tempdir_list returns a nonempty list of strings 233 234 cand = tempfile._candidate_tempdir_list() 235 236 self.assertFalse(len(cand) == 0) 237 for c in cand: 238 self.assertIsInstance(c, str) 239 240 def test_wanted_dirs(self): 241 # _candidate_tempdir_list contains the expected directories 242 243 # Make sure the interesting environment variables are all set. 244 with os_helper.EnvironmentVarGuard() as env: 245 for envname in 'TMPDIR', 'TEMP', 'TMP': 246 dirname = os.getenv(envname) 247 if not dirname: 248 env[envname] = os.path.abspath(envname) 249 250 cand = tempfile._candidate_tempdir_list() 251 252 for envname in 'TMPDIR', 'TEMP', 'TMP': 253 dirname = os.getenv(envname) 254 if not dirname: raise ValueError 255 self.assertIn(dirname, cand) 256 257 try: 258 dirname = os.getcwd() 259 except (AttributeError, OSError): 260 dirname = os.curdir 261 262 self.assertIn(dirname, cand) 263 264 # Not practical to try to verify the presence of OS-specific 265 # paths in this list. 266 267 268# We test _get_default_tempdir some more by testing gettempdir. 269 270class TestGetDefaultTempdir(BaseTestCase): 271 """Test _get_default_tempdir().""" 272 273 def test_no_files_left_behind(self): 274 # use a private empty directory 275 with tempfile.TemporaryDirectory() as our_temp_directory: 276 # force _get_default_tempdir() to consider our empty directory 277 def our_candidate_list(): 278 return [our_temp_directory] 279 280 with support.swap_attr(tempfile, "_candidate_tempdir_list", 281 our_candidate_list): 282 # verify our directory is empty after _get_default_tempdir() 283 tempfile._get_default_tempdir() 284 self.assertEqual(os.listdir(our_temp_directory), []) 285 286 def raise_OSError(*args, **kwargs): 287 raise OSError() 288 289 with support.swap_attr(os, "open", raise_OSError): 290 # test again with failing os.open() 291 with self.assertRaises(FileNotFoundError): 292 tempfile._get_default_tempdir() 293 self.assertEqual(os.listdir(our_temp_directory), []) 294 295 with support.swap_attr(os, "write", raise_OSError): 296 # test again with failing os.write() 297 with self.assertRaises(FileNotFoundError): 298 tempfile._get_default_tempdir() 299 self.assertEqual(os.listdir(our_temp_directory), []) 300 301 302class TestGetCandidateNames(BaseTestCase): 303 """Test the internal function _get_candidate_names.""" 304 305 def test_retval(self): 306 # _get_candidate_names returns a _RandomNameSequence object 307 obj = tempfile._get_candidate_names() 308 self.assertIsInstance(obj, tempfile._RandomNameSequence) 309 310 def test_same_thing(self): 311 # _get_candidate_names always returns the same object 312 a = tempfile._get_candidate_names() 313 b = tempfile._get_candidate_names() 314 315 self.assertTrue(a is b) 316 317 318@contextlib.contextmanager 319def _inside_empty_temp_dir(): 320 dir = tempfile.mkdtemp() 321 try: 322 with support.swap_attr(tempfile, 'tempdir', dir): 323 yield 324 finally: 325 os_helper.rmtree(dir) 326 327 328def _mock_candidate_names(*names): 329 return support.swap_attr(tempfile, 330 '_get_candidate_names', 331 lambda: iter(names)) 332 333 334class TestBadTempdir: 335 336 @unittest.skipIf( 337 support.is_emscripten, "Emscripten cannot remove write bits." 338 ) 339 def test_read_only_directory(self): 340 with _inside_empty_temp_dir(): 341 oldmode = mode = os.stat(tempfile.tempdir).st_mode 342 mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH) 343 os.chmod(tempfile.tempdir, mode) 344 try: 345 if os.access(tempfile.tempdir, os.W_OK): 346 self.skipTest("can't set the directory read-only") 347 with self.assertRaises(PermissionError): 348 self.make_temp() 349 self.assertEqual(os.listdir(tempfile.tempdir), []) 350 finally: 351 os.chmod(tempfile.tempdir, oldmode) 352 353 def test_nonexisting_directory(self): 354 with _inside_empty_temp_dir(): 355 tempdir = os.path.join(tempfile.tempdir, 'nonexistent') 356 with support.swap_attr(tempfile, 'tempdir', tempdir): 357 with self.assertRaises(FileNotFoundError): 358 self.make_temp() 359 360 def test_non_directory(self): 361 with _inside_empty_temp_dir(): 362 tempdir = os.path.join(tempfile.tempdir, 'file') 363 open(tempdir, 'wb').close() 364 with support.swap_attr(tempfile, 'tempdir', tempdir): 365 with self.assertRaises((NotADirectoryError, FileNotFoundError)): 366 self.make_temp() 367 368 369class TestMkstempInner(TestBadTempdir, BaseTestCase): 370 """Test the internal function _mkstemp_inner.""" 371 372 class mkstemped: 373 _bflags = tempfile._bin_openflags 374 _tflags = tempfile._text_openflags 375 _close = os.close 376 _unlink = os.unlink 377 378 def __init__(self, dir, pre, suf, bin): 379 if bin: flags = self._bflags 380 else: flags = self._tflags 381 382 output_type = tempfile._infer_return_type(dir, pre, suf) 383 (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type) 384 385 def write(self, str): 386 os.write(self.fd, str) 387 388 def __del__(self): 389 self._close(self.fd) 390 self._unlink(self.name) 391 392 def do_create(self, dir=None, pre=None, suf=None, bin=1): 393 output_type = tempfile._infer_return_type(dir, pre, suf) 394 if dir is None: 395 if output_type is str: 396 dir = tempfile.gettempdir() 397 else: 398 dir = tempfile.gettempdirb() 399 if pre is None: 400 pre = output_type() 401 if suf is None: 402 suf = output_type() 403 file = self.mkstemped(dir, pre, suf, bin) 404 405 self.nameCheck(file.name, dir, pre, suf) 406 return file 407 408 def test_basic(self): 409 # _mkstemp_inner can create files 410 self.do_create().write(b"blat") 411 self.do_create(pre="a").write(b"blat") 412 self.do_create(suf="b").write(b"blat") 413 self.do_create(pre="a", suf="b").write(b"blat") 414 self.do_create(pre="aa", suf=".txt").write(b"blat") 415 416 def test_basic_with_bytes_names(self): 417 # _mkstemp_inner can create files when given name parts all 418 # specified as bytes. 419 dir_b = tempfile.gettempdirb() 420 self.do_create(dir=dir_b, suf=b"").write(b"blat") 421 self.do_create(dir=dir_b, pre=b"a").write(b"blat") 422 self.do_create(dir=dir_b, suf=b"b").write(b"blat") 423 self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat") 424 self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat") 425 # Can't mix str & binary types in the args. 426 with self.assertRaises(TypeError): 427 self.do_create(dir="", suf=b"").write(b"blat") 428 with self.assertRaises(TypeError): 429 self.do_create(dir=dir_b, pre="").write(b"blat") 430 with self.assertRaises(TypeError): 431 self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat") 432 433 def test_basic_many(self): 434 # _mkstemp_inner can create many files (stochastic) 435 extant = list(range(TEST_FILES)) 436 for i in extant: 437 extant[i] = self.do_create(pre="aa") 438 439 def test_choose_directory(self): 440 # _mkstemp_inner can create files in a user-selected directory 441 dir = tempfile.mkdtemp() 442 try: 443 self.do_create(dir=dir).write(b"blat") 444 self.do_create(dir=pathlib.Path(dir)).write(b"blat") 445 finally: 446 support.gc_collect() # For PyPy or other GCs. 447 os.rmdir(dir) 448 449 @os_helper.skip_unless_working_chmod 450 def test_file_mode(self): 451 # _mkstemp_inner creates files with the proper mode 452 453 file = self.do_create() 454 mode = stat.S_IMODE(os.stat(file.name).st_mode) 455 expected = 0o600 456 if sys.platform == 'win32': 457 # There's no distinction among 'user', 'group' and 'world'; 458 # replicate the 'user' bits. 459 user = expected >> 6 460 expected = user * (1 + 8 + 64) 461 self.assertEqual(mode, expected) 462 463 @unittest.skipUnless(has_spawnl, 'os.spawnl not available') 464 @support.requires_subprocess() 465 def test_noinherit(self): 466 # _mkstemp_inner file handles are not inherited by child processes 467 468 if support.verbose: 469 v="v" 470 else: 471 v="q" 472 473 file = self.do_create() 474 self.assertEqual(os.get_inheritable(file.fd), False) 475 fd = "%d" % file.fd 476 477 try: 478 me = __file__ 479 except NameError: 480 me = sys.argv[0] 481 482 # We have to exec something, so that FD_CLOEXEC will take 483 # effect. The core of this test is therefore in 484 # tf_inherit_check.py, which see. 485 tester = os.path.join(os.path.dirname(os.path.abspath(me)), 486 "tf_inherit_check.py") 487 488 # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted, 489 # but an arg with embedded spaces should be decorated with double 490 # quotes on each end 491 if sys.platform == 'win32': 492 decorated = '"%s"' % sys.executable 493 tester = '"%s"' % tester 494 else: 495 decorated = sys.executable 496 497 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd) 498 self.assertFalse(retval < 0, 499 "child process caught fatal signal %d" % -retval) 500 self.assertFalse(retval > 0, "child process reports failure %d"%retval) 501 502 @unittest.skipUnless(has_textmode, "text mode not available") 503 def test_textmode(self): 504 # _mkstemp_inner can create files in text mode 505 506 # A text file is truncated at the first Ctrl+Z byte 507 f = self.do_create(bin=0) 508 f.write(b"blat\x1a") 509 f.write(b"extra\n") 510 os.lseek(f.fd, 0, os.SEEK_SET) 511 self.assertEqual(os.read(f.fd, 20), b"blat") 512 513 def make_temp(self): 514 return tempfile._mkstemp_inner(tempfile.gettempdir(), 515 tempfile.gettempprefix(), 516 '', 517 tempfile._bin_openflags, 518 str) 519 520 def test_collision_with_existing_file(self): 521 # _mkstemp_inner tries another name when a file with 522 # the chosen name already exists 523 with _inside_empty_temp_dir(), \ 524 _mock_candidate_names('aaa', 'aaa', 'bbb'): 525 (fd1, name1) = self.make_temp() 526 os.close(fd1) 527 self.assertTrue(name1.endswith('aaa')) 528 529 (fd2, name2) = self.make_temp() 530 os.close(fd2) 531 self.assertTrue(name2.endswith('bbb')) 532 533 def test_collision_with_existing_directory(self): 534 # _mkstemp_inner tries another name when a directory with 535 # the chosen name already exists 536 with _inside_empty_temp_dir(), \ 537 _mock_candidate_names('aaa', 'aaa', 'bbb'): 538 dir = tempfile.mkdtemp() 539 self.assertTrue(dir.endswith('aaa')) 540 541 (fd, name) = self.make_temp() 542 os.close(fd) 543 self.assertTrue(name.endswith('bbb')) 544 545 546class TestGetTempPrefix(BaseTestCase): 547 """Test gettempprefix().""" 548 549 def test_sane_template(self): 550 # gettempprefix returns a nonempty prefix string 551 p = tempfile.gettempprefix() 552 553 self.assertIsInstance(p, str) 554 self.assertGreater(len(p), 0) 555 556 pb = tempfile.gettempprefixb() 557 558 self.assertIsInstance(pb, bytes) 559 self.assertGreater(len(pb), 0) 560 561 def test_usable_template(self): 562 # gettempprefix returns a usable prefix string 563 564 # Create a temp directory, avoiding use of the prefix. 565 # Then attempt to create a file whose name is 566 # prefix + 'xxxxxx.xxx' in that directory. 567 p = tempfile.gettempprefix() + "xxxxxx.xxx" 568 d = tempfile.mkdtemp(prefix="") 569 try: 570 p = os.path.join(d, p) 571 fd = os.open(p, os.O_RDWR | os.O_CREAT) 572 os.close(fd) 573 os.unlink(p) 574 finally: 575 os.rmdir(d) 576 577 578class TestGetTempDir(BaseTestCase): 579 """Test gettempdir().""" 580 581 def test_directory_exists(self): 582 # gettempdir returns a directory which exists 583 584 for d in (tempfile.gettempdir(), tempfile.gettempdirb()): 585 self.assertTrue(os.path.isabs(d) or d == os.curdir, 586 "%r is not an absolute path" % d) 587 self.assertTrue(os.path.isdir(d), 588 "%r is not a directory" % d) 589 590 def test_directory_writable(self): 591 # gettempdir returns a directory writable by the user 592 593 # sneaky: just instantiate a NamedTemporaryFile, which 594 # defaults to writing into the directory returned by 595 # gettempdir. 596 with tempfile.NamedTemporaryFile() as file: 597 file.write(b"blat") 598 599 def test_same_thing(self): 600 # gettempdir always returns the same object 601 a = tempfile.gettempdir() 602 b = tempfile.gettempdir() 603 c = tempfile.gettempdirb() 604 605 self.assertTrue(a is b) 606 self.assertNotEqual(type(a), type(c)) 607 self.assertEqual(a, os.fsdecode(c)) 608 609 def test_case_sensitive(self): 610 # gettempdir should not flatten its case 611 # even on a case-insensitive file system 612 case_sensitive_tempdir = tempfile.mkdtemp("-Temp") 613 _tempdir, tempfile.tempdir = tempfile.tempdir, None 614 try: 615 with os_helper.EnvironmentVarGuard() as env: 616 # Fake the first env var which is checked as a candidate 617 env["TMPDIR"] = case_sensitive_tempdir 618 self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir) 619 finally: 620 tempfile.tempdir = _tempdir 621 os_helper.rmdir(case_sensitive_tempdir) 622 623 624class TestMkstemp(BaseTestCase): 625 """Test mkstemp().""" 626 627 def do_create(self, dir=None, pre=None, suf=None): 628 output_type = tempfile._infer_return_type(dir, pre, suf) 629 if dir is None: 630 if output_type is str: 631 dir = tempfile.gettempdir() 632 else: 633 dir = tempfile.gettempdirb() 634 if pre is None: 635 pre = output_type() 636 if suf is None: 637 suf = output_type() 638 (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf) 639 (ndir, nbase) = os.path.split(name) 640 adir = os.path.abspath(dir) 641 self.assertEqual(adir, ndir, 642 "Directory '%s' incorrectly returned as '%s'" % (adir, ndir)) 643 644 try: 645 self.nameCheck(name, dir, pre, suf) 646 finally: 647 os.close(fd) 648 os.unlink(name) 649 650 def test_basic(self): 651 # mkstemp can create files 652 self.do_create() 653 self.do_create(pre="a") 654 self.do_create(suf="b") 655 self.do_create(pre="a", suf="b") 656 self.do_create(pre="aa", suf=".txt") 657 self.do_create(dir=".") 658 659 def test_basic_with_bytes_names(self): 660 # mkstemp can create files when given name parts all 661 # specified as bytes. 662 d = tempfile.gettempdirb() 663 self.do_create(dir=d, suf=b"") 664 self.do_create(dir=d, pre=b"a") 665 self.do_create(dir=d, suf=b"b") 666 self.do_create(dir=d, pre=b"a", suf=b"b") 667 self.do_create(dir=d, pre=b"aa", suf=b".txt") 668 self.do_create(dir=b".") 669 with self.assertRaises(TypeError): 670 self.do_create(dir=".", pre=b"aa", suf=b".txt") 671 with self.assertRaises(TypeError): 672 self.do_create(dir=b".", pre="aa", suf=b".txt") 673 with self.assertRaises(TypeError): 674 self.do_create(dir=b".", pre=b"aa", suf=".txt") 675 676 677 def test_choose_directory(self): 678 # mkstemp can create directories in a user-selected directory 679 dir = tempfile.mkdtemp() 680 try: 681 self.do_create(dir=dir) 682 self.do_create(dir=pathlib.Path(dir)) 683 finally: 684 os.rmdir(dir) 685 686 def test_for_tempdir_is_bytes_issue40701_api_warts(self): 687 orig_tempdir = tempfile.tempdir 688 self.assertIsInstance(tempfile.tempdir, (str, type(None))) 689 try: 690 fd, path = tempfile.mkstemp() 691 os.close(fd) 692 os.unlink(path) 693 self.assertIsInstance(path, str) 694 tempfile.tempdir = tempfile.gettempdirb() 695 self.assertIsInstance(tempfile.tempdir, bytes) 696 self.assertIsInstance(tempfile.gettempdir(), str) 697 self.assertIsInstance(tempfile.gettempdirb(), bytes) 698 fd, path = tempfile.mkstemp() 699 os.close(fd) 700 os.unlink(path) 701 self.assertIsInstance(path, bytes) 702 fd, path = tempfile.mkstemp(suffix='.txt') 703 os.close(fd) 704 os.unlink(path) 705 self.assertIsInstance(path, str) 706 fd, path = tempfile.mkstemp(prefix='test-temp-') 707 os.close(fd) 708 os.unlink(path) 709 self.assertIsInstance(path, str) 710 fd, path = tempfile.mkstemp(dir=tempfile.gettempdir()) 711 os.close(fd) 712 os.unlink(path) 713 self.assertIsInstance(path, str) 714 finally: 715 tempfile.tempdir = orig_tempdir 716 717 718class TestMkdtemp(TestBadTempdir, BaseTestCase): 719 """Test mkdtemp().""" 720 721 def make_temp(self): 722 return tempfile.mkdtemp() 723 724 def do_create(self, dir=None, pre=None, suf=None): 725 output_type = tempfile._infer_return_type(dir, pre, suf) 726 if dir is None: 727 if output_type is str: 728 dir = tempfile.gettempdir() 729 else: 730 dir = tempfile.gettempdirb() 731 if pre is None: 732 pre = output_type() 733 if suf is None: 734 suf = output_type() 735 name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf) 736 737 try: 738 self.nameCheck(name, dir, pre, suf) 739 return name 740 except: 741 os.rmdir(name) 742 raise 743 744 def test_basic(self): 745 # mkdtemp can create directories 746 os.rmdir(self.do_create()) 747 os.rmdir(self.do_create(pre="a")) 748 os.rmdir(self.do_create(suf="b")) 749 os.rmdir(self.do_create(pre="a", suf="b")) 750 os.rmdir(self.do_create(pre="aa", suf=".txt")) 751 752 def test_basic_with_bytes_names(self): 753 # mkdtemp can create directories when given all binary parts 754 d = tempfile.gettempdirb() 755 os.rmdir(self.do_create(dir=d)) 756 os.rmdir(self.do_create(dir=d, pre=b"a")) 757 os.rmdir(self.do_create(dir=d, suf=b"b")) 758 os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b")) 759 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt")) 760 with self.assertRaises(TypeError): 761 os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt")) 762 with self.assertRaises(TypeError): 763 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt")) 764 with self.assertRaises(TypeError): 765 os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt")) 766 767 def test_basic_many(self): 768 # mkdtemp can create many directories (stochastic) 769 extant = list(range(TEST_FILES)) 770 try: 771 for i in extant: 772 extant[i] = self.do_create(pre="aa") 773 finally: 774 for i in extant: 775 if(isinstance(i, str)): 776 os.rmdir(i) 777 778 def test_choose_directory(self): 779 # mkdtemp can create directories in a user-selected directory 780 dir = tempfile.mkdtemp() 781 try: 782 os.rmdir(self.do_create(dir=dir)) 783 os.rmdir(self.do_create(dir=pathlib.Path(dir))) 784 finally: 785 os.rmdir(dir) 786 787 @os_helper.skip_unless_working_chmod 788 def test_mode(self): 789 # mkdtemp creates directories with the proper mode 790 791 dir = self.do_create() 792 try: 793 mode = stat.S_IMODE(os.stat(dir).st_mode) 794 mode &= 0o777 # Mask off sticky bits inherited from /tmp 795 expected = 0o700 796 if sys.platform == 'win32': 797 # There's no distinction among 'user', 'group' and 'world'; 798 # replicate the 'user' bits. 799 user = expected >> 6 800 expected = user * (1 + 8 + 64) 801 self.assertEqual(mode, expected) 802 finally: 803 os.rmdir(dir) 804 805 @unittest.skipUnless(os.name == "nt", "Only on Windows.") 806 def test_mode_win32(self): 807 # Use icacls.exe to extract the users with some level of access 808 # Main thing we are testing is that the BUILTIN\Users group has 809 # no access. The exact ACL is going to vary based on which user 810 # is running the test. 811 dir = self.do_create() 812 try: 813 out = subprocess.check_output(["icacls.exe", dir], encoding="oem").casefold() 814 finally: 815 os.rmdir(dir) 816 817 dir = dir.casefold() 818 users = set() 819 found_user = False 820 for line in out.strip().splitlines(): 821 acl = None 822 # First line of result includes our directory 823 if line.startswith(dir): 824 acl = line.removeprefix(dir).strip() 825 elif line and line[:1].isspace(): 826 acl = line.strip() 827 if acl: 828 users.add(acl.partition(":")[0]) 829 830 self.assertNotIn(r"BUILTIN\Users".casefold(), users) 831 832 def test_collision_with_existing_file(self): 833 # mkdtemp tries another name when a file with 834 # the chosen name already exists 835 with _inside_empty_temp_dir(), \ 836 _mock_candidate_names('aaa', 'aaa', 'bbb'): 837 file = tempfile.NamedTemporaryFile(delete=False) 838 file.close() 839 self.assertTrue(file.name.endswith('aaa')) 840 dir = tempfile.mkdtemp() 841 self.assertTrue(dir.endswith('bbb')) 842 843 def test_collision_with_existing_directory(self): 844 # mkdtemp tries another name when a directory with 845 # the chosen name already exists 846 with _inside_empty_temp_dir(), \ 847 _mock_candidate_names('aaa', 'aaa', 'bbb'): 848 dir1 = tempfile.mkdtemp() 849 self.assertTrue(dir1.endswith('aaa')) 850 dir2 = tempfile.mkdtemp() 851 self.assertTrue(dir2.endswith('bbb')) 852 853 def test_for_tempdir_is_bytes_issue40701_api_warts(self): 854 orig_tempdir = tempfile.tempdir 855 self.assertIsInstance(tempfile.tempdir, (str, type(None))) 856 try: 857 path = tempfile.mkdtemp() 858 os.rmdir(path) 859 self.assertIsInstance(path, str) 860 tempfile.tempdir = tempfile.gettempdirb() 861 self.assertIsInstance(tempfile.tempdir, bytes) 862 self.assertIsInstance(tempfile.gettempdir(), str) 863 self.assertIsInstance(tempfile.gettempdirb(), bytes) 864 path = tempfile.mkdtemp() 865 os.rmdir(path) 866 self.assertIsInstance(path, bytes) 867 path = tempfile.mkdtemp(suffix='-dir') 868 os.rmdir(path) 869 self.assertIsInstance(path, str) 870 path = tempfile.mkdtemp(prefix='test-mkdtemp-') 871 os.rmdir(path) 872 self.assertIsInstance(path, str) 873 path = tempfile.mkdtemp(dir=tempfile.gettempdir()) 874 os.rmdir(path) 875 self.assertIsInstance(path, str) 876 finally: 877 tempfile.tempdir = orig_tempdir 878 879 880class TestMktemp(BaseTestCase): 881 """Test mktemp().""" 882 883 # For safety, all use of mktemp must occur in a private directory. 884 # We must also suppress the RuntimeWarning it generates. 885 def setUp(self): 886 self.dir = tempfile.mkdtemp() 887 super().setUp() 888 889 def tearDown(self): 890 if self.dir: 891 os.rmdir(self.dir) 892 self.dir = None 893 super().tearDown() 894 895 class mktemped: 896 _unlink = os.unlink 897 _bflags = tempfile._bin_openflags 898 899 def __init__(self, dir, pre, suf): 900 self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf) 901 # Create the file. This will raise an exception if it's 902 # mysteriously appeared in the meanwhile. 903 os.close(os.open(self.name, self._bflags, 0o600)) 904 905 def __del__(self): 906 self._unlink(self.name) 907 908 def do_create(self, pre="", suf=""): 909 file = self.mktemped(self.dir, pre, suf) 910 911 self.nameCheck(file.name, self.dir, pre, suf) 912 return file 913 914 def test_basic(self): 915 # mktemp can choose usable file names 916 self.do_create() 917 self.do_create(pre="a") 918 self.do_create(suf="b") 919 self.do_create(pre="a", suf="b") 920 self.do_create(pre="aa", suf=".txt") 921 922 def test_many(self): 923 # mktemp can choose many usable file names (stochastic) 924 extant = list(range(TEST_FILES)) 925 for i in extant: 926 extant[i] = self.do_create(pre="aa") 927 del extant 928 support.gc_collect() # For PyPy or other GCs. 929 930## def test_warning(self): 931## # mktemp issues a warning when used 932## warnings.filterwarnings("error", 933## category=RuntimeWarning, 934## message="mktemp") 935## self.assertRaises(RuntimeWarning, 936## tempfile.mktemp, dir=self.dir) 937 938 939# We test _TemporaryFileWrapper by testing NamedTemporaryFile. 940 941 942class TestNamedTemporaryFile(BaseTestCase): 943 """Test NamedTemporaryFile().""" 944 945 def do_create(self, dir=None, pre="", suf="", delete=True): 946 if dir is None: 947 dir = tempfile.gettempdir() 948 file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf, 949 delete=delete) 950 951 self.nameCheck(file.name, dir, pre, suf) 952 return file 953 954 955 def test_basic(self): 956 # NamedTemporaryFile can create files 957 self.do_create() 958 self.do_create(pre="a") 959 self.do_create(suf="b") 960 self.do_create(pre="a", suf="b") 961 self.do_create(pre="aa", suf=".txt") 962 963 def test_method_lookup(self): 964 # Issue #18879: Looking up a temporary file method should keep it 965 # alive long enough. 966 f = self.do_create() 967 wr = weakref.ref(f) 968 write = f.write 969 write2 = f.write 970 del f 971 write(b'foo') 972 del write 973 write2(b'bar') 974 del write2 975 if support.check_impl_detail(cpython=True): 976 # No reference cycle was created. 977 self.assertIsNone(wr()) 978 979 def test_iter(self): 980 # Issue #23700: getting iterator from a temporary file should keep 981 # it alive as long as it's being iterated over 982 lines = [b'spam\n', b'eggs\n', b'beans\n'] 983 def make_file(): 984 f = tempfile.NamedTemporaryFile(mode='w+b') 985 f.write(b''.join(lines)) 986 f.seek(0) 987 return f 988 for i, l in enumerate(make_file()): 989 self.assertEqual(l, lines[i]) 990 self.assertEqual(i, len(lines) - 1) 991 992 def test_creates_named(self): 993 # NamedTemporaryFile creates files with names 994 f = tempfile.NamedTemporaryFile() 995 self.assertTrue(os.path.exists(f.name), 996 "NamedTemporaryFile %s does not exist" % f.name) 997 998 def test_del_on_close(self): 999 # A NamedTemporaryFile is deleted when closed 1000 dir = tempfile.mkdtemp() 1001 try: 1002 with tempfile.NamedTemporaryFile(dir=dir) as f: 1003 f.write(b'blat') 1004 self.assertEqual(os.listdir(dir), []) 1005 self.assertFalse(os.path.exists(f.name), 1006 "NamedTemporaryFile %s exists after close" % f.name) 1007 finally: 1008 os.rmdir(dir) 1009 1010 def test_dis_del_on_close(self): 1011 # Tests that delete-on-close can be disabled 1012 dir = tempfile.mkdtemp() 1013 tmp = None 1014 try: 1015 f = tempfile.NamedTemporaryFile(dir=dir, delete=False) 1016 tmp = f.name 1017 f.write(b'blat') 1018 f.close() 1019 self.assertTrue(os.path.exists(f.name), 1020 "NamedTemporaryFile %s missing after close" % f.name) 1021 finally: 1022 if tmp is not None: 1023 os.unlink(tmp) 1024 os.rmdir(dir) 1025 1026 def test_multiple_close(self): 1027 # A NamedTemporaryFile can be closed many times without error 1028 f = tempfile.NamedTemporaryFile() 1029 f.write(b'abc\n') 1030 f.close() 1031 f.close() 1032 f.close() 1033 1034 def test_context_manager(self): 1035 # A NamedTemporaryFile can be used as a context manager 1036 with tempfile.NamedTemporaryFile() as f: 1037 self.assertTrue(os.path.exists(f.name)) 1038 self.assertFalse(os.path.exists(f.name)) 1039 def use_closed(): 1040 with f: 1041 pass 1042 self.assertRaises(ValueError, use_closed) 1043 1044 def test_bad_mode(self): 1045 dir = tempfile.mkdtemp() 1046 self.addCleanup(os_helper.rmtree, dir) 1047 with self.assertRaises(ValueError): 1048 tempfile.NamedTemporaryFile(mode='wr', dir=dir) 1049 with self.assertRaises(TypeError): 1050 tempfile.NamedTemporaryFile(mode=2, dir=dir) 1051 self.assertEqual(os.listdir(dir), []) 1052 1053 def test_bad_encoding(self): 1054 dir = tempfile.mkdtemp() 1055 self.addCleanup(os_helper.rmtree, dir) 1056 with self.assertRaises(LookupError): 1057 tempfile.NamedTemporaryFile('w', encoding='bad-encoding', dir=dir) 1058 self.assertEqual(os.listdir(dir), []) 1059 1060 def test_unexpected_error(self): 1061 dir = tempfile.mkdtemp() 1062 self.addCleanup(os_helper.rmtree, dir) 1063 with mock.patch('tempfile._TemporaryFileWrapper') as mock_ntf, \ 1064 mock.patch('io.open', mock.mock_open()) as mock_open: 1065 mock_ntf.side_effect = KeyboardInterrupt() 1066 with self.assertRaises(KeyboardInterrupt): 1067 tempfile.NamedTemporaryFile(dir=dir) 1068 mock_open().close.assert_called() 1069 self.assertEqual(os.listdir(dir), []) 1070 1071 # How to test the mode and bufsize parameters? 1072 1073class TestSpooledTemporaryFile(BaseTestCase): 1074 """Test SpooledTemporaryFile().""" 1075 1076 def do_create(self, max_size=0, dir=None, pre="", suf=""): 1077 if dir is None: 1078 dir = tempfile.gettempdir() 1079 file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf) 1080 1081 return file 1082 1083 1084 def test_basic(self): 1085 # SpooledTemporaryFile can create files 1086 f = self.do_create() 1087 self.assertFalse(f._rolled) 1088 f = self.do_create(max_size=100, pre="a", suf=".txt") 1089 self.assertFalse(f._rolled) 1090 1091 def test_is_iobase(self): 1092 # SpooledTemporaryFile should implement io.IOBase 1093 self.assertIsInstance(self.do_create(), io.IOBase) 1094 1095 def test_iobase_interface(self): 1096 # SpooledTemporaryFile should implement the io.IOBase interface. 1097 # Ensure it has all the required methods and properties. 1098 iobase_attrs = { 1099 # From IOBase 1100 'fileno', 'seek', 'truncate', 'close', 'closed', '__enter__', 1101 '__exit__', 'flush', 'isatty', '__iter__', '__next__', 'readable', 1102 'readline', 'readlines', 'seekable', 'tell', 'writable', 1103 'writelines', 1104 # From BufferedIOBase (binary mode) and TextIOBase (text mode) 1105 'detach', 'read', 'read1', 'write', 'readinto', 'readinto1', 1106 'encoding', 'errors', 'newlines', 1107 } 1108 spooledtempfile_attrs = set(dir(tempfile.SpooledTemporaryFile)) 1109 missing_attrs = iobase_attrs - spooledtempfile_attrs 1110 self.assertFalse( 1111 missing_attrs, 1112 'SpooledTemporaryFile missing attributes from IOBase/BufferedIOBase/TextIOBase' 1113 ) 1114 1115 def test_del_on_close(self): 1116 # A SpooledTemporaryFile is deleted when closed 1117 dir = tempfile.mkdtemp() 1118 try: 1119 f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir) 1120 self.assertFalse(f._rolled) 1121 f.write(b'blat ' * 5) 1122 self.assertTrue(f._rolled) 1123 filename = f.name 1124 f.close() 1125 self.assertEqual(os.listdir(dir), []) 1126 if not isinstance(filename, int): 1127 self.assertFalse(os.path.exists(filename), 1128 "SpooledTemporaryFile %s exists after close" % filename) 1129 finally: 1130 os.rmdir(dir) 1131 1132 def test_del_unrolled_file(self): 1133 # The unrolled SpooledTemporaryFile should raise a ResourceWarning 1134 # when deleted since the file was not explicitly closed. 1135 f = self.do_create(max_size=10) 1136 f.write(b'foo') 1137 self.assertEqual(f.name, None) # Unrolled so no filename/fd 1138 with self.assertWarns(ResourceWarning): 1139 f.__del__() 1140 1141 @unittest.skipIf( 1142 support.is_emscripten, "Emscripten cannot fstat renamed files." 1143 ) 1144 def test_del_rolled_file(self): 1145 # The rolled file should be deleted when the SpooledTemporaryFile 1146 # object is deleted. This should raise a ResourceWarning since the file 1147 # was not explicitly closed. 1148 f = self.do_create(max_size=2) 1149 f.write(b'foo') 1150 name = f.name # This is a fd on posix+cygwin, a filename everywhere else 1151 self.assertTrue(os.path.exists(name)) 1152 with self.assertWarns(ResourceWarning): 1153 f.__del__() 1154 self.assertFalse( 1155 os.path.exists(name), 1156 "Rolled SpooledTemporaryFile (name=%s) exists after delete" % name 1157 ) 1158 1159 def test_rewrite_small(self): 1160 # A SpooledTemporaryFile can be written to multiple within the max_size 1161 f = self.do_create(max_size=30) 1162 self.assertFalse(f._rolled) 1163 for i in range(5): 1164 f.seek(0, 0) 1165 f.write(b'x' * 20) 1166 self.assertFalse(f._rolled) 1167 1168 def test_write_sequential(self): 1169 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1170 # over afterward 1171 f = self.do_create(max_size=30) 1172 self.assertFalse(f._rolled) 1173 f.write(b'x' * 20) 1174 self.assertFalse(f._rolled) 1175 f.write(b'x' * 10) 1176 self.assertFalse(f._rolled) 1177 f.write(b'x') 1178 self.assertTrue(f._rolled) 1179 1180 def test_writelines(self): 1181 # Verify writelines with a SpooledTemporaryFile 1182 f = self.do_create() 1183 f.writelines((b'x', b'y', b'z')) 1184 pos = f.seek(0) 1185 self.assertEqual(pos, 0) 1186 buf = f.read() 1187 self.assertEqual(buf, b'xyz') 1188 1189 def test_writelines_sequential(self): 1190 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1191 # over afterward 1192 f = self.do_create(max_size=35) 1193 f.writelines((b'x' * 20, b'x' * 10, b'x' * 5)) 1194 self.assertFalse(f._rolled) 1195 f.write(b'x') 1196 self.assertTrue(f._rolled) 1197 1198 def test_sparse(self): 1199 # A SpooledTemporaryFile that is written late in the file will extend 1200 # when that occurs 1201 f = self.do_create(max_size=30) 1202 self.assertFalse(f._rolled) 1203 pos = f.seek(100, 0) 1204 self.assertEqual(pos, 100) 1205 self.assertFalse(f._rolled) 1206 f.write(b'x') 1207 self.assertTrue(f._rolled) 1208 1209 def test_fileno(self): 1210 # A SpooledTemporaryFile should roll over to a real file on fileno() 1211 f = self.do_create(max_size=30) 1212 self.assertFalse(f._rolled) 1213 self.assertTrue(f.fileno() > 0) 1214 self.assertTrue(f._rolled) 1215 1216 def test_multiple_close_before_rollover(self): 1217 # A SpooledTemporaryFile can be closed many times without error 1218 f = tempfile.SpooledTemporaryFile() 1219 f.write(b'abc\n') 1220 self.assertFalse(f._rolled) 1221 f.close() 1222 f.close() 1223 f.close() 1224 1225 def test_multiple_close_after_rollover(self): 1226 # A SpooledTemporaryFile can be closed many times without error 1227 f = tempfile.SpooledTemporaryFile(max_size=1) 1228 f.write(b'abc\n') 1229 self.assertTrue(f._rolled) 1230 f.close() 1231 f.close() 1232 f.close() 1233 1234 def test_bound_methods(self): 1235 # It should be OK to steal a bound method from a SpooledTemporaryFile 1236 # and use it independently; when the file rolls over, those bound 1237 # methods should continue to function 1238 f = self.do_create(max_size=30) 1239 read = f.read 1240 write = f.write 1241 seek = f.seek 1242 1243 write(b"a" * 35) 1244 write(b"b" * 35) 1245 seek(0, 0) 1246 self.assertEqual(read(70), b'a'*35 + b'b'*35) 1247 1248 def test_properties(self): 1249 f = tempfile.SpooledTemporaryFile(max_size=10) 1250 f.write(b'x' * 10) 1251 self.assertFalse(f._rolled) 1252 self.assertEqual(f.mode, 'w+b') 1253 self.assertIsNone(f.name) 1254 with self.assertRaises(AttributeError): 1255 f.newlines 1256 with self.assertRaises(AttributeError): 1257 f.encoding 1258 with self.assertRaises(AttributeError): 1259 f.errors 1260 1261 f.write(b'x') 1262 self.assertTrue(f._rolled) 1263 self.assertEqual(f.mode, 'rb+') 1264 self.assertIsNotNone(f.name) 1265 with self.assertRaises(AttributeError): 1266 f.newlines 1267 with self.assertRaises(AttributeError): 1268 f.encoding 1269 with self.assertRaises(AttributeError): 1270 f.errors 1271 1272 def test_text_mode(self): 1273 # Creating a SpooledTemporaryFile with a text mode should produce 1274 # a file object reading and writing (Unicode) text strings. 1275 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 1276 encoding="utf-8") 1277 f.write("abc\n") 1278 f.seek(0) 1279 self.assertEqual(f.read(), "abc\n") 1280 f.write("def\n") 1281 f.seek(0) 1282 self.assertEqual(f.read(), "abc\ndef\n") 1283 self.assertFalse(f._rolled) 1284 self.assertEqual(f.mode, 'w+') 1285 self.assertIsNone(f.name) 1286 self.assertEqual(f.newlines, os.linesep) 1287 self.assertEqual(f.encoding, "utf-8") 1288 self.assertEqual(f.errors, "strict") 1289 1290 f.write("xyzzy\n") 1291 f.seek(0) 1292 self.assertEqual(f.read(), "abc\ndef\nxyzzy\n") 1293 # Check that Ctrl+Z doesn't truncate the file 1294 f.write("foo\x1abar\n") 1295 f.seek(0) 1296 self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n") 1297 self.assertTrue(f._rolled) 1298 self.assertEqual(f.mode, 'w+') 1299 self.assertIsNotNone(f.name) 1300 self.assertEqual(f.newlines, os.linesep) 1301 self.assertEqual(f.encoding, "utf-8") 1302 self.assertEqual(f.errors, "strict") 1303 1304 def test_text_newline_and_encoding(self): 1305 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 1306 newline='', encoding='utf-8', 1307 errors='ignore') 1308 f.write("\u039B\r\n") 1309 f.seek(0) 1310 self.assertEqual(f.read(), "\u039B\r\n") 1311 self.assertFalse(f._rolled) 1312 self.assertEqual(f.mode, 'w+') 1313 self.assertIsNone(f.name) 1314 self.assertIsNotNone(f.newlines) 1315 self.assertEqual(f.encoding, "utf-8") 1316 self.assertEqual(f.errors, "ignore") 1317 1318 f.write("\u039C" * 10 + "\r\n") 1319 f.write("\u039D" * 20) 1320 f.seek(0) 1321 self.assertEqual(f.read(), 1322 "\u039B\r\n" + ("\u039C" * 10) + "\r\n" + ("\u039D" * 20)) 1323 self.assertTrue(f._rolled) 1324 self.assertEqual(f.mode, 'w+') 1325 self.assertIsNotNone(f.name) 1326 self.assertIsNotNone(f.newlines) 1327 self.assertEqual(f.encoding, 'utf-8') 1328 self.assertEqual(f.errors, 'ignore') 1329 1330 def test_context_manager_before_rollover(self): 1331 # A SpooledTemporaryFile can be used as a context manager 1332 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1333 self.assertFalse(f._rolled) 1334 self.assertFalse(f.closed) 1335 self.assertTrue(f.closed) 1336 def use_closed(): 1337 with f: 1338 pass 1339 self.assertRaises(ValueError, use_closed) 1340 1341 def test_context_manager_during_rollover(self): 1342 # A SpooledTemporaryFile can be used as a context manager 1343 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1344 self.assertFalse(f._rolled) 1345 f.write(b'abc\n') 1346 f.flush() 1347 self.assertTrue(f._rolled) 1348 self.assertFalse(f.closed) 1349 self.assertTrue(f.closed) 1350 def use_closed(): 1351 with f: 1352 pass 1353 self.assertRaises(ValueError, use_closed) 1354 1355 def test_context_manager_after_rollover(self): 1356 # A SpooledTemporaryFile can be used as a context manager 1357 f = tempfile.SpooledTemporaryFile(max_size=1) 1358 f.write(b'abc\n') 1359 f.flush() 1360 self.assertTrue(f._rolled) 1361 with f: 1362 self.assertFalse(f.closed) 1363 self.assertTrue(f.closed) 1364 def use_closed(): 1365 with f: 1366 pass 1367 self.assertRaises(ValueError, use_closed) 1368 1369 @unittest.skipIf( 1370 support.is_emscripten, "Emscripten cannot fstat renamed files." 1371 ) 1372 def test_truncate_with_size_parameter(self): 1373 # A SpooledTemporaryFile can be truncated to zero size 1374 f = tempfile.SpooledTemporaryFile(max_size=10) 1375 f.write(b'abcdefg\n') 1376 f.seek(0) 1377 f.truncate() 1378 self.assertFalse(f._rolled) 1379 self.assertEqual(f._file.getvalue(), b'') 1380 # A SpooledTemporaryFile can be truncated to a specific size 1381 f = tempfile.SpooledTemporaryFile(max_size=10) 1382 f.write(b'abcdefg\n') 1383 f.truncate(4) 1384 self.assertFalse(f._rolled) 1385 self.assertEqual(f._file.getvalue(), b'abcd') 1386 # A SpooledTemporaryFile rolls over if truncated to large size 1387 f = tempfile.SpooledTemporaryFile(max_size=10) 1388 f.write(b'abcdefg\n') 1389 f.truncate(20) 1390 self.assertTrue(f._rolled) 1391 self.assertEqual(os.fstat(f.fileno()).st_size, 20) 1392 1393 def test_class_getitem(self): 1394 self.assertIsInstance(tempfile.SpooledTemporaryFile[bytes], 1395 types.GenericAlias) 1396 1397if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile: 1398 1399 class TestTemporaryFile(BaseTestCase): 1400 """Test TemporaryFile().""" 1401 1402 def test_basic(self): 1403 # TemporaryFile can create files 1404 # No point in testing the name params - the file has no name. 1405 tempfile.TemporaryFile() 1406 1407 def test_has_no_name(self): 1408 # TemporaryFile creates files with no names (on this system) 1409 dir = tempfile.mkdtemp() 1410 f = tempfile.TemporaryFile(dir=dir) 1411 f.write(b'blat') 1412 1413 # Sneaky: because this file has no name, it should not prevent 1414 # us from removing the directory it was created in. 1415 try: 1416 os.rmdir(dir) 1417 except: 1418 # cleanup 1419 f.close() 1420 os.rmdir(dir) 1421 raise 1422 1423 def test_multiple_close(self): 1424 # A TemporaryFile can be closed many times without error 1425 f = tempfile.TemporaryFile() 1426 f.write(b'abc\n') 1427 f.close() 1428 f.close() 1429 f.close() 1430 1431 # How to test the mode and bufsize parameters? 1432 def test_mode_and_encoding(self): 1433 1434 def roundtrip(input, *args, **kwargs): 1435 with tempfile.TemporaryFile(*args, **kwargs) as fileobj: 1436 fileobj.write(input) 1437 fileobj.seek(0) 1438 self.assertEqual(input, fileobj.read()) 1439 1440 roundtrip(b"1234", "w+b") 1441 roundtrip("abdc\n", "w+") 1442 roundtrip("\u039B", "w+", encoding="utf-16") 1443 roundtrip("foo\r\n", "w+", newline="") 1444 1445 def test_bad_mode(self): 1446 dir = tempfile.mkdtemp() 1447 self.addCleanup(os_helper.rmtree, dir) 1448 with self.assertRaises(ValueError): 1449 tempfile.TemporaryFile(mode='wr', dir=dir) 1450 with self.assertRaises(TypeError): 1451 tempfile.TemporaryFile(mode=2, dir=dir) 1452 self.assertEqual(os.listdir(dir), []) 1453 1454 def test_bad_encoding(self): 1455 dir = tempfile.mkdtemp() 1456 self.addCleanup(os_helper.rmtree, dir) 1457 with self.assertRaises(LookupError): 1458 tempfile.TemporaryFile('w', encoding='bad-encoding', dir=dir) 1459 self.assertEqual(os.listdir(dir), []) 1460 1461 def test_unexpected_error(self): 1462 dir = tempfile.mkdtemp() 1463 self.addCleanup(os_helper.rmtree, dir) 1464 with mock.patch('tempfile._O_TMPFILE_WORKS', False), \ 1465 mock.patch('os.unlink') as mock_unlink, \ 1466 mock.patch('os.open') as mock_open, \ 1467 mock.patch('os.close') as mock_close: 1468 mock_unlink.side_effect = KeyboardInterrupt() 1469 with self.assertRaises(KeyboardInterrupt): 1470 tempfile.TemporaryFile(dir=dir) 1471 mock_close.assert_called() 1472 self.assertEqual(os.listdir(dir), []) 1473 1474 1475# Helper for test_del_on_shutdown 1476class NulledModules: 1477 def __init__(self, *modules): 1478 self.refs = [mod.__dict__ for mod in modules] 1479 self.contents = [ref.copy() for ref in self.refs] 1480 1481 def __enter__(self): 1482 for d in self.refs: 1483 for key in d: 1484 d[key] = None 1485 1486 def __exit__(self, *exc_info): 1487 for d, c in zip(self.refs, self.contents): 1488 d.clear() 1489 d.update(c) 1490 1491 1492class TestTemporaryDirectory(BaseTestCase): 1493 """Test TemporaryDirectory().""" 1494 1495 def do_create(self, dir=None, pre="", suf="", recurse=1, dirs=1, files=1, 1496 ignore_cleanup_errors=False): 1497 if dir is None: 1498 dir = tempfile.gettempdir() 1499 tmp = tempfile.TemporaryDirectory( 1500 dir=dir, prefix=pre, suffix=suf, 1501 ignore_cleanup_errors=ignore_cleanup_errors) 1502 self.nameCheck(tmp.name, dir, pre, suf) 1503 self.do_create2(tmp.name, recurse, dirs, files) 1504 return tmp 1505 1506 def do_create2(self, path, recurse=1, dirs=1, files=1): 1507 # Create subdirectories and some files 1508 if recurse: 1509 for i in range(dirs): 1510 name = os.path.join(path, "dir%d" % i) 1511 os.mkdir(name) 1512 self.do_create2(name, recurse-1, dirs, files) 1513 for i in range(files): 1514 with open(os.path.join(path, "test%d.txt" % i), "wb") as f: 1515 f.write(b"Hello world!") 1516 1517 def test_mkdtemp_failure(self): 1518 # Check no additional exception if mkdtemp fails 1519 # Previously would raise AttributeError instead 1520 # (noted as part of Issue #10188) 1521 with tempfile.TemporaryDirectory() as nonexistent: 1522 pass 1523 with self.assertRaises(FileNotFoundError) as cm: 1524 tempfile.TemporaryDirectory(dir=nonexistent) 1525 self.assertEqual(cm.exception.errno, errno.ENOENT) 1526 1527 def test_explicit_cleanup(self): 1528 # A TemporaryDirectory is deleted when cleaned up 1529 dir = tempfile.mkdtemp() 1530 try: 1531 d = self.do_create(dir=dir) 1532 self.assertTrue(os.path.exists(d.name), 1533 "TemporaryDirectory %s does not exist" % d.name) 1534 d.cleanup() 1535 self.assertFalse(os.path.exists(d.name), 1536 "TemporaryDirectory %s exists after cleanup" % d.name) 1537 finally: 1538 os.rmdir(dir) 1539 1540 def test_explict_cleanup_ignore_errors(self): 1541 """Test that cleanup doesn't return an error when ignoring them.""" 1542 with tempfile.TemporaryDirectory() as working_dir: 1543 temp_dir = self.do_create( 1544 dir=working_dir, ignore_cleanup_errors=True) 1545 temp_path = pathlib.Path(temp_dir.name) 1546 self.assertTrue(temp_path.exists(), 1547 f"TemporaryDirectory {temp_path!s} does not exist") 1548 with open(temp_path / "a_file.txt", "w+t") as open_file: 1549 open_file.write("Hello world!\n") 1550 temp_dir.cleanup() 1551 self.assertEqual(len(list(temp_path.glob("*"))), 1552 int(sys.platform.startswith("win")), 1553 "Unexpected number of files in " 1554 f"TemporaryDirectory {temp_path!s}") 1555 self.assertEqual( 1556 temp_path.exists(), 1557 sys.platform.startswith("win"), 1558 f"TemporaryDirectory {temp_path!s} existence state unexpected") 1559 temp_dir.cleanup() 1560 self.assertFalse( 1561 temp_path.exists(), 1562 f"TemporaryDirectory {temp_path!s} exists after cleanup") 1563 1564 @os_helper.skip_unless_symlink 1565 def test_cleanup_with_symlink_to_a_directory(self): 1566 # cleanup() should not follow symlinks to directories (issue #12464) 1567 d1 = self.do_create() 1568 d2 = self.do_create(recurse=0) 1569 1570 # Symlink d1/foo -> d2 1571 os.symlink(d2.name, os.path.join(d1.name, "foo")) 1572 1573 # This call to cleanup() should not follow the "foo" symlink 1574 d1.cleanup() 1575 1576 self.assertFalse(os.path.exists(d1.name), 1577 "TemporaryDirectory %s exists after cleanup" % d1.name) 1578 self.assertTrue(os.path.exists(d2.name), 1579 "Directory pointed to by a symlink was deleted") 1580 self.assertEqual(os.listdir(d2.name), ['test0.txt'], 1581 "Contents of the directory pointed to by a symlink " 1582 "were deleted") 1583 d2.cleanup() 1584 1585 @os_helper.skip_unless_symlink 1586 def test_cleanup_with_symlink_modes(self): 1587 # cleanup() should not follow symlinks when fixing mode bits (#91133) 1588 with self.do_create(recurse=0) as d2: 1589 file1 = os.path.join(d2, 'file1') 1590 open(file1, 'wb').close() 1591 dir1 = os.path.join(d2, 'dir1') 1592 os.mkdir(dir1) 1593 for mode in range(8): 1594 mode <<= 6 1595 with self.subTest(mode=format(mode, '03o')): 1596 def test(target, target_is_directory): 1597 d1 = self.do_create(recurse=0) 1598 symlink = os.path.join(d1.name, 'symlink') 1599 os.symlink(target, symlink, 1600 target_is_directory=target_is_directory) 1601 try: 1602 os.chmod(symlink, mode, follow_symlinks=False) 1603 except NotImplementedError: 1604 pass 1605 try: 1606 os.chmod(symlink, mode) 1607 except FileNotFoundError: 1608 pass 1609 os.chmod(d1.name, mode) 1610 d1.cleanup() 1611 self.assertFalse(os.path.exists(d1.name)) 1612 1613 with self.subTest('nonexisting file'): 1614 test('nonexisting', target_is_directory=False) 1615 with self.subTest('nonexisting dir'): 1616 test('nonexisting', target_is_directory=True) 1617 1618 with self.subTest('existing file'): 1619 os.chmod(file1, mode) 1620 old_mode = os.stat(file1).st_mode 1621 test(file1, target_is_directory=False) 1622 new_mode = os.stat(file1).st_mode 1623 self.assertEqual(new_mode, old_mode, 1624 '%03o != %03o' % (new_mode, old_mode)) 1625 1626 with self.subTest('existing dir'): 1627 os.chmod(dir1, mode) 1628 old_mode = os.stat(dir1).st_mode 1629 test(dir1, target_is_directory=True) 1630 new_mode = os.stat(dir1).st_mode 1631 self.assertEqual(new_mode, old_mode, 1632 '%03o != %03o' % (new_mode, old_mode)) 1633 1634 @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags') 1635 @os_helper.skip_unless_symlink 1636 def test_cleanup_with_symlink_flags(self): 1637 # cleanup() should not follow symlinks when fixing flags (#91133) 1638 flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK 1639 self.check_flags(flags) 1640 1641 with self.do_create(recurse=0) as d2: 1642 file1 = os.path.join(d2, 'file1') 1643 open(file1, 'wb').close() 1644 dir1 = os.path.join(d2, 'dir1') 1645 os.mkdir(dir1) 1646 def test(target, target_is_directory): 1647 d1 = self.do_create(recurse=0) 1648 symlink = os.path.join(d1.name, 'symlink') 1649 os.symlink(target, symlink, 1650 target_is_directory=target_is_directory) 1651 try: 1652 os.chflags(symlink, flags, follow_symlinks=False) 1653 except NotImplementedError: 1654 pass 1655 try: 1656 os.chflags(symlink, flags) 1657 except FileNotFoundError: 1658 pass 1659 os.chflags(d1.name, flags) 1660 d1.cleanup() 1661 self.assertFalse(os.path.exists(d1.name)) 1662 1663 with self.subTest('nonexisting file'): 1664 test('nonexisting', target_is_directory=False) 1665 with self.subTest('nonexisting dir'): 1666 test('nonexisting', target_is_directory=True) 1667 1668 with self.subTest('existing file'): 1669 os.chflags(file1, flags) 1670 old_flags = os.stat(file1).st_flags 1671 test(file1, target_is_directory=False) 1672 new_flags = os.stat(file1).st_flags 1673 self.assertEqual(new_flags, old_flags) 1674 1675 with self.subTest('existing dir'): 1676 os.chflags(dir1, flags) 1677 old_flags = os.stat(dir1).st_flags 1678 test(dir1, target_is_directory=True) 1679 new_flags = os.stat(dir1).st_flags 1680 self.assertEqual(new_flags, old_flags) 1681 1682 @support.cpython_only 1683 def test_del_on_collection(self): 1684 # A TemporaryDirectory is deleted when garbage collected 1685 dir = tempfile.mkdtemp() 1686 try: 1687 d = self.do_create(dir=dir) 1688 name = d.name 1689 del d # Rely on refcounting to invoke __del__ 1690 self.assertFalse(os.path.exists(name), 1691 "TemporaryDirectory %s exists after __del__" % name) 1692 finally: 1693 os.rmdir(dir) 1694 1695 @support.cpython_only 1696 def test_del_on_collection_ignore_errors(self): 1697 """Test that ignoring errors works when TemporaryDirectory is gced.""" 1698 with tempfile.TemporaryDirectory() as working_dir: 1699 temp_dir = self.do_create( 1700 dir=working_dir, ignore_cleanup_errors=True) 1701 temp_path = pathlib.Path(temp_dir.name) 1702 self.assertTrue(temp_path.exists(), 1703 f"TemporaryDirectory {temp_path!s} does not exist") 1704 with open(temp_path / "a_file.txt", "w+t") as open_file: 1705 open_file.write("Hello world!\n") 1706 del temp_dir 1707 self.assertEqual(len(list(temp_path.glob("*"))), 1708 int(sys.platform.startswith("win")), 1709 "Unexpected number of files in " 1710 f"TemporaryDirectory {temp_path!s}") 1711 self.assertEqual( 1712 temp_path.exists(), 1713 sys.platform.startswith("win"), 1714 f"TemporaryDirectory {temp_path!s} existence state unexpected") 1715 1716 def test_del_on_shutdown(self): 1717 # A TemporaryDirectory may be cleaned up during shutdown 1718 with self.do_create() as dir: 1719 for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'): 1720 code = """if True: 1721 import builtins 1722 import os 1723 import shutil 1724 import sys 1725 import tempfile 1726 import warnings 1727 1728 tmp = tempfile.TemporaryDirectory(dir={dir!r}) 1729 sys.stdout.buffer.write(tmp.name.encode()) 1730 1731 tmp2 = os.path.join(tmp.name, 'test_dir') 1732 os.mkdir(tmp2) 1733 with open(os.path.join(tmp2, "test0.txt"), "w") as f: 1734 f.write("Hello world!") 1735 1736 {mod}.tmp = tmp 1737 1738 warnings.filterwarnings("always", category=ResourceWarning) 1739 """.format(dir=dir, mod=mod) 1740 rc, out, err = script_helper.assert_python_ok("-c", code) 1741 tmp_name = out.decode().strip() 1742 self.assertFalse(os.path.exists(tmp_name), 1743 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1744 err = err.decode('utf-8', 'backslashreplace') 1745 self.assertNotIn("Exception ", err) 1746 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1747 1748 def test_del_on_shutdown_ignore_errors(self): 1749 """Test ignoring errors works when a tempdir is gc'ed on shutdown.""" 1750 with tempfile.TemporaryDirectory() as working_dir: 1751 code = """if True: 1752 import pathlib 1753 import sys 1754 import tempfile 1755 import warnings 1756 1757 temp_dir = tempfile.TemporaryDirectory( 1758 dir={working_dir!r}, ignore_cleanup_errors=True) 1759 sys.stdout.buffer.write(temp_dir.name.encode()) 1760 1761 temp_dir_2 = pathlib.Path(temp_dir.name) / "test_dir" 1762 temp_dir_2.mkdir() 1763 with open(temp_dir_2 / "test0.txt", "w") as test_file: 1764 test_file.write("Hello world!") 1765 open_file = open(temp_dir_2 / "open_file.txt", "w") 1766 open_file.write("Hello world!") 1767 1768 warnings.filterwarnings("always", category=ResourceWarning) 1769 """.format(working_dir=working_dir) 1770 __, out, err = script_helper.assert_python_ok("-c", code) 1771 temp_path = pathlib.Path(out.decode().strip()) 1772 self.assertEqual(len(list(temp_path.glob("*"))), 1773 int(sys.platform.startswith("win")), 1774 "Unexpected number of files in " 1775 f"TemporaryDirectory {temp_path!s}") 1776 self.assertEqual( 1777 temp_path.exists(), 1778 sys.platform.startswith("win"), 1779 f"TemporaryDirectory {temp_path!s} existence state unexpected") 1780 err = err.decode('utf-8', 'backslashreplace') 1781 self.assertNotIn("Exception", err) 1782 self.assertNotIn("Error", err) 1783 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1784 1785 def test_exit_on_shutdown(self): 1786 # Issue #22427 1787 with self.do_create() as dir: 1788 code = """if True: 1789 import sys 1790 import tempfile 1791 import warnings 1792 1793 def generator(): 1794 with tempfile.TemporaryDirectory(dir={dir!r}) as tmp: 1795 yield tmp 1796 g = generator() 1797 sys.stdout.buffer.write(next(g).encode()) 1798 1799 warnings.filterwarnings("always", category=ResourceWarning) 1800 """.format(dir=dir) 1801 rc, out, err = script_helper.assert_python_ok("-c", code) 1802 tmp_name = out.decode().strip() 1803 self.assertFalse(os.path.exists(tmp_name), 1804 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1805 err = err.decode('utf-8', 'backslashreplace') 1806 self.assertNotIn("Exception ", err) 1807 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1808 1809 def test_warnings_on_cleanup(self): 1810 # ResourceWarning will be triggered by __del__ 1811 with self.do_create() as dir: 1812 d = self.do_create(dir=dir, recurse=3) 1813 name = d.name 1814 1815 # Check for the resource warning 1816 with warnings_helper.check_warnings(('Implicitly', 1817 ResourceWarning), 1818 quiet=False): 1819 warnings.filterwarnings("always", category=ResourceWarning) 1820 del d 1821 support.gc_collect() 1822 self.assertFalse(os.path.exists(name), 1823 "TemporaryDirectory %s exists after __del__" % name) 1824 1825 def test_multiple_close(self): 1826 # Can be cleaned-up many times without error 1827 d = self.do_create() 1828 d.cleanup() 1829 d.cleanup() 1830 d.cleanup() 1831 1832 def test_context_manager(self): 1833 # Can be used as a context manager 1834 d = self.do_create() 1835 with d as name: 1836 self.assertTrue(os.path.exists(name)) 1837 self.assertEqual(name, d.name) 1838 self.assertFalse(os.path.exists(name)) 1839 1840 def test_modes(self): 1841 for mode in range(8): 1842 mode <<= 6 1843 with self.subTest(mode=format(mode, '03o')): 1844 d = self.do_create(recurse=3, dirs=2, files=2) 1845 with d: 1846 # Change files and directories mode recursively. 1847 for root, dirs, files in os.walk(d.name, topdown=False): 1848 for name in files: 1849 os.chmod(os.path.join(root, name), mode) 1850 os.chmod(root, mode) 1851 d.cleanup() 1852 self.assertFalse(os.path.exists(d.name)) 1853 1854 def check_flags(self, flags): 1855 # skip the test if these flags are not supported (ex: FreeBSD 13) 1856 filename = os_helper.TESTFN 1857 try: 1858 open(filename, "w").close() 1859 try: 1860 os.chflags(filename, flags) 1861 except OSError as exc: 1862 # "OSError: [Errno 45] Operation not supported" 1863 self.skipTest(f"chflags() doesn't support flags " 1864 f"{flags:#b}: {exc}") 1865 else: 1866 os.chflags(filename, 0) 1867 finally: 1868 os_helper.unlink(filename) 1869 1870 @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags') 1871 def test_flags(self): 1872 flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK 1873 self.check_flags(flags) 1874 1875 d = self.do_create(recurse=3, dirs=2, files=2) 1876 with d: 1877 # Change files and directories flags recursively. 1878 for root, dirs, files in os.walk(d.name, topdown=False): 1879 for name in files: 1880 os.chflags(os.path.join(root, name), flags) 1881 os.chflags(root, flags) 1882 d.cleanup() 1883 self.assertFalse(os.path.exists(d.name)) 1884 1885 1886if __name__ == "__main__": 1887 unittest.main() 1888