1# Copyright (C) 2003 Python Software Foundation 2 3import unittest 4import unittest.mock 5import shutil 6import tempfile 7import sys 8import stat 9import os 10import os.path 11import errno 12import functools 13import pathlib 14import subprocess 15import random 16import string 17import contextlib 18import io 19from shutil import (make_archive, 20 register_archive_format, unregister_archive_format, 21 get_archive_formats, Error, unpack_archive, 22 register_unpack_format, RegistryError, 23 unregister_unpack_format, get_unpack_formats, 24 SameFileError, _GiveupOnFastCopy) 25import tarfile 26import zipfile 27try: 28 import posix 29except ImportError: 30 posix = None 31 32from test import support 33from test.support import os_helper 34from test.support.os_helper import TESTFN, FakePath 35 36TESTFN2 = TESTFN + "2" 37TESTFN_SRC = TESTFN + "_SRC" 38TESTFN_DST = TESTFN + "_DST" 39MACOS = sys.platform.startswith("darwin") 40SOLARIS = sys.platform.startswith("sunos") 41AIX = sys.platform[:3] == 'aix' 42try: 43 import grp 44 import pwd 45 UID_GID_SUPPORT = True 46except ImportError: 47 UID_GID_SUPPORT = False 48 49try: 50 import _winapi 51except ImportError: 52 _winapi = None 53 54def _fake_rename(*args, **kwargs): 55 # Pretend the destination path is on a different filesystem. 56 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link") 57 58def mock_rename(func): 59 @functools.wraps(func) 60 def wrap(*args, **kwargs): 61 try: 62 builtin_rename = os.rename 63 os.rename = _fake_rename 64 return func(*args, **kwargs) 65 finally: 66 os.rename = builtin_rename 67 return wrap 68 69def write_file(path, content, binary=False): 70 """Write *content* to a file located at *path*. 71 72 If *path* is a tuple instead of a string, os.path.join will be used to 73 make a path. If *binary* is true, the file will be opened in binary 74 mode. 75 """ 76 if isinstance(path, tuple): 77 path = os.path.join(*path) 78 mode = 'wb' if binary else 'w' 79 encoding = None if binary else "utf-8" 80 with open(path, mode, encoding=encoding) as fp: 81 fp.write(content) 82 83def write_test_file(path, size): 84 """Create a test file with an arbitrary size and random text content.""" 85 def chunks(total, step): 86 assert total >= step 87 while total > step: 88 yield step 89 total -= step 90 if total: 91 yield total 92 93 bufsize = min(size, 8192) 94 chunk = b"".join([random.choice(string.ascii_letters).encode() 95 for i in range(bufsize)]) 96 with open(path, 'wb') as f: 97 for csize in chunks(size, bufsize): 98 f.write(chunk) 99 assert os.path.getsize(path) == size 100 101def read_file(path, binary=False): 102 """Return contents from a file located at *path*. 103 104 If *path* is a tuple instead of a string, os.path.join will be used to 105 make a path. If *binary* is true, the file will be opened in binary 106 mode. 107 """ 108 if isinstance(path, tuple): 109 path = os.path.join(*path) 110 mode = 'rb' if binary else 'r' 111 encoding = None if binary else "utf-8" 112 with open(path, mode, encoding=encoding) as fp: 113 return fp.read() 114 115def rlistdir(path): 116 res = [] 117 for name in sorted(os.listdir(path)): 118 p = os.path.join(path, name) 119 if os.path.isdir(p) and not os.path.islink(p): 120 res.append(name + '/') 121 for n in rlistdir(p): 122 res.append(name + '/' + n) 123 else: 124 res.append(name) 125 return res 126 127def supports_file2file_sendfile(): 128 # ...apparently Linux and Solaris are the only ones 129 if not hasattr(os, "sendfile"): 130 return False 131 srcname = None 132 dstname = None 133 try: 134 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f: 135 srcname = f.name 136 f.write(b"0123456789") 137 138 with open(srcname, "rb") as src: 139 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst: 140 dstname = dst.name 141 infd = src.fileno() 142 outfd = dst.fileno() 143 try: 144 os.sendfile(outfd, infd, 0, 2) 145 except OSError: 146 return False 147 else: 148 return True 149 finally: 150 if srcname is not None: 151 os_helper.unlink(srcname) 152 if dstname is not None: 153 os_helper.unlink(dstname) 154 155 156SUPPORTS_SENDFILE = supports_file2file_sendfile() 157 158# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test 159# The AIX command 'dump -o program' gives XCOFF header information 160# The second word of the last line in the maxdata value 161# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed 162def _maxdataOK(): 163 if AIX and sys.maxsize == 2147483647: 164 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable) 165 maxdata=hdrs.split("\n")[-1].split()[1] 166 return int(maxdata,16) >= 0x20000000 167 else: 168 return True 169 170 171class BaseTest: 172 173 def mkdtemp(self, prefix=None): 174 """Create a temporary directory that will be cleaned up. 175 176 Returns the path of the directory. 177 """ 178 d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd()) 179 self.addCleanup(os_helper.rmtree, d) 180 return d 181 182 183class TestRmTree(BaseTest, unittest.TestCase): 184 185 def test_rmtree_works_on_bytes(self): 186 tmp = self.mkdtemp() 187 victim = os.path.join(tmp, 'killme') 188 os.mkdir(victim) 189 write_file(os.path.join(victim, 'somefile'), 'foo') 190 victim = os.fsencode(victim) 191 self.assertIsInstance(victim, bytes) 192 shutil.rmtree(victim) 193 194 @os_helper.skip_unless_symlink 195 def test_rmtree_fails_on_symlink(self): 196 tmp = self.mkdtemp() 197 dir_ = os.path.join(tmp, 'dir') 198 os.mkdir(dir_) 199 link = os.path.join(tmp, 'link') 200 os.symlink(dir_, link) 201 self.assertRaises(OSError, shutil.rmtree, link) 202 self.assertTrue(os.path.exists(dir_)) 203 self.assertTrue(os.path.lexists(link)) 204 errors = [] 205 def onerror(*args): 206 errors.append(args) 207 shutil.rmtree(link, onerror=onerror) 208 self.assertEqual(len(errors), 1) 209 self.assertIs(errors[0][0], os.path.islink) 210 self.assertEqual(errors[0][1], link) 211 self.assertIsInstance(errors[0][2][1], OSError) 212 213 @os_helper.skip_unless_symlink 214 def test_rmtree_works_on_symlinks(self): 215 tmp = self.mkdtemp() 216 dir1 = os.path.join(tmp, 'dir1') 217 dir2 = os.path.join(dir1, 'dir2') 218 dir3 = os.path.join(tmp, 'dir3') 219 for d in dir1, dir2, dir3: 220 os.mkdir(d) 221 file1 = os.path.join(tmp, 'file1') 222 write_file(file1, 'foo') 223 link1 = os.path.join(dir1, 'link1') 224 os.symlink(dir2, link1) 225 link2 = os.path.join(dir1, 'link2') 226 os.symlink(dir3, link2) 227 link3 = os.path.join(dir1, 'link3') 228 os.symlink(file1, link3) 229 # make sure symlinks are removed but not followed 230 shutil.rmtree(dir1) 231 self.assertFalse(os.path.exists(dir1)) 232 self.assertTrue(os.path.exists(dir3)) 233 self.assertTrue(os.path.exists(file1)) 234 235 @unittest.skipUnless(_winapi, 'only relevant on Windows') 236 def test_rmtree_fails_on_junctions(self): 237 tmp = self.mkdtemp() 238 dir_ = os.path.join(tmp, 'dir') 239 os.mkdir(dir_) 240 link = os.path.join(tmp, 'link') 241 _winapi.CreateJunction(dir_, link) 242 self.addCleanup(os_helper.unlink, link) 243 self.assertRaises(OSError, shutil.rmtree, link) 244 self.assertTrue(os.path.exists(dir_)) 245 self.assertTrue(os.path.lexists(link)) 246 errors = [] 247 def onerror(*args): 248 errors.append(args) 249 shutil.rmtree(link, onerror=onerror) 250 self.assertEqual(len(errors), 1) 251 self.assertIs(errors[0][0], os.path.islink) 252 self.assertEqual(errors[0][1], link) 253 self.assertIsInstance(errors[0][2][1], OSError) 254 255 @unittest.skipUnless(_winapi, 'only relevant on Windows') 256 def test_rmtree_works_on_junctions(self): 257 tmp = self.mkdtemp() 258 dir1 = os.path.join(tmp, 'dir1') 259 dir2 = os.path.join(dir1, 'dir2') 260 dir3 = os.path.join(tmp, 'dir3') 261 for d in dir1, dir2, dir3: 262 os.mkdir(d) 263 file1 = os.path.join(tmp, 'file1') 264 write_file(file1, 'foo') 265 link1 = os.path.join(dir1, 'link1') 266 _winapi.CreateJunction(dir2, link1) 267 link2 = os.path.join(dir1, 'link2') 268 _winapi.CreateJunction(dir3, link2) 269 link3 = os.path.join(dir1, 'link3') 270 _winapi.CreateJunction(file1, link3) 271 # make sure junctions are removed but not followed 272 shutil.rmtree(dir1) 273 self.assertFalse(os.path.exists(dir1)) 274 self.assertTrue(os.path.exists(dir3)) 275 self.assertTrue(os.path.exists(file1)) 276 277 def test_rmtree_errors(self): 278 # filename is guaranteed not to exist 279 filename = tempfile.mktemp(dir=self.mkdtemp()) 280 self.assertRaises(FileNotFoundError, shutil.rmtree, filename) 281 # test that ignore_errors option is honored 282 shutil.rmtree(filename, ignore_errors=True) 283 284 # existing file 285 tmpdir = self.mkdtemp() 286 write_file((tmpdir, "tstfile"), "") 287 filename = os.path.join(tmpdir, "tstfile") 288 with self.assertRaises(NotADirectoryError) as cm: 289 shutil.rmtree(filename) 290 self.assertEqual(cm.exception.filename, filename) 291 self.assertTrue(os.path.exists(filename)) 292 # test that ignore_errors option is honored 293 shutil.rmtree(filename, ignore_errors=True) 294 self.assertTrue(os.path.exists(filename)) 295 errors = [] 296 def onerror(*args): 297 errors.append(args) 298 shutil.rmtree(filename, onerror=onerror) 299 self.assertEqual(len(errors), 2) 300 self.assertIs(errors[0][0], os.scandir) 301 self.assertEqual(errors[0][1], filename) 302 self.assertIsInstance(errors[0][2][1], NotADirectoryError) 303 self.assertEqual(errors[0][2][1].filename, filename) 304 self.assertIs(errors[1][0], os.rmdir) 305 self.assertEqual(errors[1][1], filename) 306 self.assertIsInstance(errors[1][2][1], NotADirectoryError) 307 self.assertEqual(errors[1][2][1].filename, filename) 308 309 310 @unittest.skipIf(sys.platform[:6] == 'cygwin', 311 "This test can't be run on Cygwin (issue #1071513).") 312 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 313 "This test can't be run reliably as root (issue #1076467).") 314 def test_on_error(self): 315 self.errorState = 0 316 os.mkdir(TESTFN) 317 self.addCleanup(shutil.rmtree, TESTFN) 318 319 self.child_file_path = os.path.join(TESTFN, 'a') 320 self.child_dir_path = os.path.join(TESTFN, 'b') 321 os_helper.create_empty_file(self.child_file_path) 322 os.mkdir(self.child_dir_path) 323 old_dir_mode = os.stat(TESTFN).st_mode 324 old_child_file_mode = os.stat(self.child_file_path).st_mode 325 old_child_dir_mode = os.stat(self.child_dir_path).st_mode 326 # Make unwritable. 327 new_mode = stat.S_IREAD|stat.S_IEXEC 328 os.chmod(self.child_file_path, new_mode) 329 os.chmod(self.child_dir_path, new_mode) 330 os.chmod(TESTFN, new_mode) 331 332 self.addCleanup(os.chmod, TESTFN, old_dir_mode) 333 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode) 334 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode) 335 336 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror) 337 # Test whether onerror has actually been called. 338 self.assertEqual(self.errorState, 3, 339 "Expected call to onerror function did not happen.") 340 341 def check_args_to_onerror(self, func, arg, exc): 342 # test_rmtree_errors deliberately runs rmtree 343 # on a directory that is chmod 500, which will fail. 344 # This function is run when shutil.rmtree fails. 345 # 99.9% of the time it initially fails to remove 346 # a file in the directory, so the first time through 347 # func is os.remove. 348 # However, some Linux machines running ZFS on 349 # FUSE experienced a failure earlier in the process 350 # at os.listdir. The first failure may legally 351 # be either. 352 if self.errorState < 2: 353 if func is os.unlink: 354 self.assertEqual(arg, self.child_file_path) 355 elif func is os.rmdir: 356 self.assertEqual(arg, self.child_dir_path) 357 else: 358 self.assertIs(func, os.listdir) 359 self.assertIn(arg, [TESTFN, self.child_dir_path]) 360 self.assertTrue(issubclass(exc[0], OSError)) 361 self.errorState += 1 362 else: 363 self.assertEqual(func, os.rmdir) 364 self.assertEqual(arg, TESTFN) 365 self.assertTrue(issubclass(exc[0], OSError)) 366 self.errorState = 3 367 368 def test_rmtree_does_not_choke_on_failing_lstat(self): 369 try: 370 orig_lstat = os.lstat 371 def raiser(fn, *args, **kwargs): 372 if fn != TESTFN: 373 raise OSError() 374 else: 375 return orig_lstat(fn) 376 os.lstat = raiser 377 378 os.mkdir(TESTFN) 379 write_file((TESTFN, 'foo'), 'foo') 380 shutil.rmtree(TESTFN) 381 finally: 382 os.lstat = orig_lstat 383 384 def test_rmtree_uses_safe_fd_version_if_available(self): 385 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <= 386 os.supports_dir_fd and 387 os.listdir in os.supports_fd and 388 os.stat in os.supports_follow_symlinks) 389 if _use_fd_functions: 390 self.assertTrue(shutil._use_fd_functions) 391 self.assertTrue(shutil.rmtree.avoids_symlink_attacks) 392 tmp_dir = self.mkdtemp() 393 d = os.path.join(tmp_dir, 'a') 394 os.mkdir(d) 395 try: 396 real_rmtree = shutil._rmtree_safe_fd 397 class Called(Exception): pass 398 def _raiser(*args, **kwargs): 399 raise Called 400 shutil._rmtree_safe_fd = _raiser 401 self.assertRaises(Called, shutil.rmtree, d) 402 finally: 403 shutil._rmtree_safe_fd = real_rmtree 404 else: 405 self.assertFalse(shutil._use_fd_functions) 406 self.assertFalse(shutil.rmtree.avoids_symlink_attacks) 407 408 def test_rmtree_dont_delete_file(self): 409 # When called on a file instead of a directory, don't delete it. 410 handle, path = tempfile.mkstemp(dir=self.mkdtemp()) 411 os.close(handle) 412 self.assertRaises(NotADirectoryError, shutil.rmtree, path) 413 os.remove(path) 414 415 @os_helper.skip_unless_symlink 416 def test_rmtree_on_symlink(self): 417 # bug 1669. 418 os.mkdir(TESTFN) 419 try: 420 src = os.path.join(TESTFN, 'cheese') 421 dst = os.path.join(TESTFN, 'shop') 422 os.mkdir(src) 423 os.symlink(src, dst) 424 self.assertRaises(OSError, shutil.rmtree, dst) 425 shutil.rmtree(dst, ignore_errors=True) 426 finally: 427 shutil.rmtree(TESTFN, ignore_errors=True) 428 429 @unittest.skipUnless(_winapi, 'only relevant on Windows') 430 def test_rmtree_on_junction(self): 431 os.mkdir(TESTFN) 432 try: 433 src = os.path.join(TESTFN, 'cheese') 434 dst = os.path.join(TESTFN, 'shop') 435 os.mkdir(src) 436 open(os.path.join(src, 'spam'), 'wb').close() 437 _winapi.CreateJunction(src, dst) 438 self.assertRaises(OSError, shutil.rmtree, dst) 439 shutil.rmtree(dst, ignore_errors=True) 440 finally: 441 shutil.rmtree(TESTFN, ignore_errors=True) 442 443 444class TestCopyTree(BaseTest, unittest.TestCase): 445 446 def test_copytree_simple(self): 447 src_dir = self.mkdtemp() 448 dst_dir = os.path.join(self.mkdtemp(), 'destination') 449 self.addCleanup(shutil.rmtree, src_dir) 450 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir)) 451 write_file((src_dir, 'test.txt'), '123') 452 os.mkdir(os.path.join(src_dir, 'test_dir')) 453 write_file((src_dir, 'test_dir', 'test.txt'), '456') 454 455 shutil.copytree(src_dir, dst_dir) 456 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt'))) 457 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir'))) 458 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir', 459 'test.txt'))) 460 actual = read_file((dst_dir, 'test.txt')) 461 self.assertEqual(actual, '123') 462 actual = read_file((dst_dir, 'test_dir', 'test.txt')) 463 self.assertEqual(actual, '456') 464 465 def test_copytree_dirs_exist_ok(self): 466 src_dir = self.mkdtemp() 467 dst_dir = self.mkdtemp() 468 self.addCleanup(shutil.rmtree, src_dir) 469 self.addCleanup(shutil.rmtree, dst_dir) 470 471 write_file((src_dir, 'nonexisting.txt'), '123') 472 os.mkdir(os.path.join(src_dir, 'existing_dir')) 473 os.mkdir(os.path.join(dst_dir, 'existing_dir')) 474 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced') 475 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced') 476 477 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True) 478 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt'))) 479 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir'))) 480 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir', 481 'existing.txt'))) 482 actual = read_file((dst_dir, 'nonexisting.txt')) 483 self.assertEqual(actual, '123') 484 actual = read_file((dst_dir, 'existing_dir', 'existing.txt')) 485 self.assertEqual(actual, 'has been replaced') 486 487 with self.assertRaises(FileExistsError): 488 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False) 489 490 @os_helper.skip_unless_symlink 491 def test_copytree_symlinks(self): 492 tmp_dir = self.mkdtemp() 493 src_dir = os.path.join(tmp_dir, 'src') 494 dst_dir = os.path.join(tmp_dir, 'dst') 495 sub_dir = os.path.join(src_dir, 'sub') 496 os.mkdir(src_dir) 497 os.mkdir(sub_dir) 498 write_file((src_dir, 'file.txt'), 'foo') 499 src_link = os.path.join(sub_dir, 'link') 500 dst_link = os.path.join(dst_dir, 'sub/link') 501 os.symlink(os.path.join(src_dir, 'file.txt'), 502 src_link) 503 if hasattr(os, 'lchmod'): 504 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 505 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 506 os.lchflags(src_link, stat.UF_NODUMP) 507 src_stat = os.lstat(src_link) 508 shutil.copytree(src_dir, dst_dir, symlinks=True) 509 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link'))) 510 actual = os.readlink(os.path.join(dst_dir, 'sub', 'link')) 511 # Bad practice to blindly strip the prefix as it may be required to 512 # correctly refer to the file, but we're only comparing paths here. 513 if os.name == 'nt' and actual.startswith('\\\\?\\'): 514 actual = actual[4:] 515 self.assertEqual(actual, os.path.join(src_dir, 'file.txt')) 516 dst_stat = os.lstat(dst_link) 517 if hasattr(os, 'lchmod'): 518 self.assertEqual(dst_stat.st_mode, src_stat.st_mode) 519 if hasattr(os, 'lchflags'): 520 self.assertEqual(dst_stat.st_flags, src_stat.st_flags) 521 522 def test_copytree_with_exclude(self): 523 # creating data 524 join = os.path.join 525 exists = os.path.exists 526 src_dir = self.mkdtemp() 527 try: 528 dst_dir = join(self.mkdtemp(), 'destination') 529 write_file((src_dir, 'test.txt'), '123') 530 write_file((src_dir, 'test.tmp'), '123') 531 os.mkdir(join(src_dir, 'test_dir')) 532 write_file((src_dir, 'test_dir', 'test.txt'), '456') 533 os.mkdir(join(src_dir, 'test_dir2')) 534 write_file((src_dir, 'test_dir2', 'test.txt'), '456') 535 os.mkdir(join(src_dir, 'test_dir2', 'subdir')) 536 os.mkdir(join(src_dir, 'test_dir2', 'subdir2')) 537 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456') 538 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456') 539 540 # testing glob-like patterns 541 try: 542 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2') 543 shutil.copytree(src_dir, dst_dir, ignore=patterns) 544 # checking the result: some elements should not be copied 545 self.assertTrue(exists(join(dst_dir, 'test.txt'))) 546 self.assertFalse(exists(join(dst_dir, 'test.tmp'))) 547 self.assertFalse(exists(join(dst_dir, 'test_dir2'))) 548 finally: 549 shutil.rmtree(dst_dir) 550 try: 551 patterns = shutil.ignore_patterns('*.tmp', 'subdir*') 552 shutil.copytree(src_dir, dst_dir, ignore=patterns) 553 # checking the result: some elements should not be copied 554 self.assertFalse(exists(join(dst_dir, 'test.tmp'))) 555 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2'))) 556 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir'))) 557 finally: 558 shutil.rmtree(dst_dir) 559 560 # testing callable-style 561 try: 562 def _filter(src, names): 563 res = [] 564 for name in names: 565 path = os.path.join(src, name) 566 567 if (os.path.isdir(path) and 568 path.split()[-1] == 'subdir'): 569 res.append(name) 570 elif os.path.splitext(path)[-1] in ('.py'): 571 res.append(name) 572 return res 573 574 shutil.copytree(src_dir, dst_dir, ignore=_filter) 575 576 # checking the result: some elements should not be copied 577 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2', 578 'test.py'))) 579 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir'))) 580 581 finally: 582 shutil.rmtree(dst_dir) 583 finally: 584 shutil.rmtree(src_dir) 585 shutil.rmtree(os.path.dirname(dst_dir)) 586 587 def test_copytree_arg_types_of_ignore(self): 588 join = os.path.join 589 exists = os.path.exists 590 591 tmp_dir = self.mkdtemp() 592 src_dir = join(tmp_dir, "source") 593 594 os.mkdir(join(src_dir)) 595 os.mkdir(join(src_dir, 'test_dir')) 596 os.mkdir(os.path.join(src_dir, 'test_dir', 'subdir')) 597 write_file((src_dir, 'test_dir', 'subdir', 'test.txt'), '456') 598 599 invokations = [] 600 601 def _ignore(src, names): 602 invokations.append(src) 603 self.assertIsInstance(src, str) 604 self.assertIsInstance(names, list) 605 self.assertEqual(len(names), len(set(names))) 606 for name in names: 607 self.assertIsInstance(name, str) 608 return [] 609 610 dst_dir = join(self.mkdtemp(), 'destination') 611 shutil.copytree(src_dir, dst_dir, ignore=_ignore) 612 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir', 613 'test.txt'))) 614 615 dst_dir = join(self.mkdtemp(), 'destination') 616 shutil.copytree(pathlib.Path(src_dir), dst_dir, ignore=_ignore) 617 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir', 618 'test.txt'))) 619 620 dst_dir = join(self.mkdtemp(), 'destination') 621 src_dir_entry = list(os.scandir(tmp_dir))[0] 622 self.assertIsInstance(src_dir_entry, os.DirEntry) 623 shutil.copytree(src_dir_entry, dst_dir, ignore=_ignore) 624 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir', 625 'test.txt'))) 626 627 self.assertEqual(len(invokations), 9) 628 629 def test_copytree_retains_permissions(self): 630 tmp_dir = self.mkdtemp() 631 src_dir = os.path.join(tmp_dir, 'source') 632 os.mkdir(src_dir) 633 dst_dir = os.path.join(tmp_dir, 'destination') 634 self.addCleanup(shutil.rmtree, tmp_dir) 635 636 os.chmod(src_dir, 0o777) 637 write_file((src_dir, 'permissive.txt'), '123') 638 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777) 639 write_file((src_dir, 'restrictive.txt'), '456') 640 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600) 641 restrictive_subdir = tempfile.mkdtemp(dir=src_dir) 642 self.addCleanup(os_helper.rmtree, restrictive_subdir) 643 os.chmod(restrictive_subdir, 0o600) 644 645 shutil.copytree(src_dir, dst_dir) 646 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode) 647 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode, 648 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode) 649 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode, 650 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode) 651 restrictive_subdir_dst = os.path.join(dst_dir, 652 os.path.split(restrictive_subdir)[1]) 653 self.assertEqual(os.stat(restrictive_subdir).st_mode, 654 os.stat(restrictive_subdir_dst).st_mode) 655 656 @unittest.mock.patch('os.chmod') 657 def test_copytree_winerror(self, mock_patch): 658 # When copying to VFAT, copystat() raises OSError. On Windows, the 659 # exception object has a meaningful 'winerror' attribute, but not 660 # on other operating systems. Do not assume 'winerror' is set. 661 src_dir = self.mkdtemp() 662 dst_dir = os.path.join(self.mkdtemp(), 'destination') 663 self.addCleanup(shutil.rmtree, src_dir) 664 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir)) 665 666 mock_patch.side_effect = PermissionError('ka-boom') 667 with self.assertRaises(shutil.Error): 668 shutil.copytree(src_dir, dst_dir) 669 670 def test_copytree_custom_copy_function(self): 671 # See: https://bugs.python.org/issue35648 672 def custom_cpfun(a, b): 673 flag.append(None) 674 self.assertIsInstance(a, str) 675 self.assertIsInstance(b, str) 676 self.assertEqual(a, os.path.join(src, 'foo')) 677 self.assertEqual(b, os.path.join(dst, 'foo')) 678 679 flag = [] 680 src = self.mkdtemp() 681 dst = tempfile.mktemp(dir=self.mkdtemp()) 682 with open(os.path.join(src, 'foo'), 'w', encoding='utf-8') as f: 683 f.close() 684 shutil.copytree(src, dst, copy_function=custom_cpfun) 685 self.assertEqual(len(flag), 1) 686 687 # Issue #3002: copyfile and copytree block indefinitely on named pipes 688 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') 689 @os_helper.skip_unless_symlink 690 @unittest.skipIf(sys.platform == "vxworks", 691 "fifo requires special path on VxWorks") 692 def test_copytree_named_pipe(self): 693 os.mkdir(TESTFN) 694 try: 695 subdir = os.path.join(TESTFN, "subdir") 696 os.mkdir(subdir) 697 pipe = os.path.join(subdir, "mypipe") 698 try: 699 os.mkfifo(pipe) 700 except PermissionError as e: 701 self.skipTest('os.mkfifo(): %s' % e) 702 try: 703 shutil.copytree(TESTFN, TESTFN2) 704 except shutil.Error as e: 705 errors = e.args[0] 706 self.assertEqual(len(errors), 1) 707 src, dst, error_msg = errors[0] 708 self.assertEqual("`%s` is a named pipe" % pipe, error_msg) 709 else: 710 self.fail("shutil.Error should have been raised") 711 finally: 712 shutil.rmtree(TESTFN, ignore_errors=True) 713 shutil.rmtree(TESTFN2, ignore_errors=True) 714 715 def test_copytree_special_func(self): 716 src_dir = self.mkdtemp() 717 dst_dir = os.path.join(self.mkdtemp(), 'destination') 718 write_file((src_dir, 'test.txt'), '123') 719 os.mkdir(os.path.join(src_dir, 'test_dir')) 720 write_file((src_dir, 'test_dir', 'test.txt'), '456') 721 722 copied = [] 723 def _copy(src, dst): 724 copied.append((src, dst)) 725 726 shutil.copytree(src_dir, dst_dir, copy_function=_copy) 727 self.assertEqual(len(copied), 2) 728 729 @os_helper.skip_unless_symlink 730 def test_copytree_dangling_symlinks(self): 731 # a dangling symlink raises an error at the end 732 src_dir = self.mkdtemp() 733 dst_dir = os.path.join(self.mkdtemp(), 'destination') 734 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt')) 735 os.mkdir(os.path.join(src_dir, 'test_dir')) 736 write_file((src_dir, 'test_dir', 'test.txt'), '456') 737 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir) 738 739 # a dangling symlink is ignored with the proper flag 740 dst_dir = os.path.join(self.mkdtemp(), 'destination2') 741 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True) 742 self.assertNotIn('test.txt', os.listdir(dst_dir)) 743 744 # a dangling symlink is copied if symlinks=True 745 dst_dir = os.path.join(self.mkdtemp(), 'destination3') 746 shutil.copytree(src_dir, dst_dir, symlinks=True) 747 self.assertIn('test.txt', os.listdir(dst_dir)) 748 749 @os_helper.skip_unless_symlink 750 def test_copytree_symlink_dir(self): 751 src_dir = self.mkdtemp() 752 dst_dir = os.path.join(self.mkdtemp(), 'destination') 753 os.mkdir(os.path.join(src_dir, 'real_dir')) 754 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'wb'): 755 pass 756 os.symlink(os.path.join(src_dir, 'real_dir'), 757 os.path.join(src_dir, 'link_to_dir'), 758 target_is_directory=True) 759 760 shutil.copytree(src_dir, dst_dir, symlinks=False) 761 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir'))) 762 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir'))) 763 764 dst_dir = os.path.join(self.mkdtemp(), 'destination2') 765 shutil.copytree(src_dir, dst_dir, symlinks=True) 766 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir'))) 767 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir'))) 768 769 def test_copytree_return_value(self): 770 # copytree returns its destination path. 771 src_dir = self.mkdtemp() 772 dst_dir = src_dir + "dest" 773 self.addCleanup(shutil.rmtree, dst_dir, True) 774 src = os.path.join(src_dir, 'foo') 775 write_file(src, 'foo') 776 rv = shutil.copytree(src_dir, dst_dir) 777 self.assertEqual(['foo'], os.listdir(rv)) 778 779 def test_copytree_subdirectory(self): 780 # copytree where dst is a subdirectory of src, see Issue 38688 781 base_dir = self.mkdtemp() 782 self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True) 783 src_dir = os.path.join(base_dir, "t", "pg") 784 dst_dir = os.path.join(src_dir, "somevendor", "1.0") 785 os.makedirs(src_dir) 786 src = os.path.join(src_dir, 'pol') 787 write_file(src, 'pol') 788 rv = shutil.copytree(src_dir, dst_dir) 789 self.assertEqual(['pol'], os.listdir(rv)) 790 791class TestCopy(BaseTest, unittest.TestCase): 792 793 ### shutil.copymode 794 795 @os_helper.skip_unless_symlink 796 def test_copymode_follow_symlinks(self): 797 tmp_dir = self.mkdtemp() 798 src = os.path.join(tmp_dir, 'foo') 799 dst = os.path.join(tmp_dir, 'bar') 800 src_link = os.path.join(tmp_dir, 'baz') 801 dst_link = os.path.join(tmp_dir, 'quux') 802 write_file(src, 'foo') 803 write_file(dst, 'foo') 804 os.symlink(src, src_link) 805 os.symlink(dst, dst_link) 806 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) 807 # file to file 808 os.chmod(dst, stat.S_IRWXO) 809 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 810 shutil.copymode(src, dst) 811 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 812 # On Windows, os.chmod does not follow symlinks (issue #15411) 813 if os.name != 'nt': 814 # follow src link 815 os.chmod(dst, stat.S_IRWXO) 816 shutil.copymode(src_link, dst) 817 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 818 # follow dst link 819 os.chmod(dst, stat.S_IRWXO) 820 shutil.copymode(src, dst_link) 821 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 822 # follow both links 823 os.chmod(dst, stat.S_IRWXO) 824 shutil.copymode(src_link, dst_link) 825 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 826 827 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod') 828 @os_helper.skip_unless_symlink 829 def test_copymode_symlink_to_symlink(self): 830 tmp_dir = self.mkdtemp() 831 src = os.path.join(tmp_dir, 'foo') 832 dst = os.path.join(tmp_dir, 'bar') 833 src_link = os.path.join(tmp_dir, 'baz') 834 dst_link = os.path.join(tmp_dir, 'quux') 835 write_file(src, 'foo') 836 write_file(dst, 'foo') 837 os.symlink(src, src_link) 838 os.symlink(dst, dst_link) 839 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) 840 os.chmod(dst, stat.S_IRWXU) 841 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG) 842 # link to link 843 os.lchmod(dst_link, stat.S_IRWXO) 844 shutil.copymode(src_link, dst_link, follow_symlinks=False) 845 self.assertEqual(os.lstat(src_link).st_mode, 846 os.lstat(dst_link).st_mode) 847 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 848 # src link - use chmod 849 os.lchmod(dst_link, stat.S_IRWXO) 850 shutil.copymode(src_link, dst, follow_symlinks=False) 851 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 852 # dst link - use chmod 853 os.lchmod(dst_link, stat.S_IRWXO) 854 shutil.copymode(src, dst_link, follow_symlinks=False) 855 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 856 857 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing') 858 @os_helper.skip_unless_symlink 859 def test_copymode_symlink_to_symlink_wo_lchmod(self): 860 tmp_dir = self.mkdtemp() 861 src = os.path.join(tmp_dir, 'foo') 862 dst = os.path.join(tmp_dir, 'bar') 863 src_link = os.path.join(tmp_dir, 'baz') 864 dst_link = os.path.join(tmp_dir, 'quux') 865 write_file(src, 'foo') 866 write_file(dst, 'foo') 867 os.symlink(src, src_link) 868 os.symlink(dst, dst_link) 869 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail 870 871 ### shutil.copystat 872 873 @os_helper.skip_unless_symlink 874 def test_copystat_symlinks(self): 875 tmp_dir = self.mkdtemp() 876 src = os.path.join(tmp_dir, 'foo') 877 dst = os.path.join(tmp_dir, 'bar') 878 src_link = os.path.join(tmp_dir, 'baz') 879 dst_link = os.path.join(tmp_dir, 'qux') 880 write_file(src, 'foo') 881 src_stat = os.stat(src) 882 os.utime(src, (src_stat.st_atime, 883 src_stat.st_mtime - 42.0)) # ensure different mtimes 884 write_file(dst, 'bar') 885 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime) 886 os.symlink(src, src_link) 887 os.symlink(dst, dst_link) 888 if hasattr(os, 'lchmod'): 889 os.lchmod(src_link, stat.S_IRWXO) 890 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 891 os.lchflags(src_link, stat.UF_NODUMP) 892 src_link_stat = os.lstat(src_link) 893 # follow 894 if hasattr(os, 'lchmod'): 895 shutil.copystat(src_link, dst_link, follow_symlinks=True) 896 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode) 897 # don't follow 898 shutil.copystat(src_link, dst_link, follow_symlinks=False) 899 dst_link_stat = os.lstat(dst_link) 900 if os.utime in os.supports_follow_symlinks: 901 for attr in 'st_atime', 'st_mtime': 902 # The modification times may be truncated in the new file. 903 self.assertLessEqual(getattr(src_link_stat, attr), 904 getattr(dst_link_stat, attr) + 1) 905 if hasattr(os, 'lchmod'): 906 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode) 907 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): 908 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags) 909 # tell to follow but dst is not a link 910 shutil.copystat(src_link, dst, follow_symlinks=False) 911 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) < 912 00000.1) 913 914 @unittest.skipUnless(hasattr(os, 'chflags') and 915 hasattr(errno, 'EOPNOTSUPP') and 916 hasattr(errno, 'ENOTSUP'), 917 "requires os.chflags, EOPNOTSUPP & ENOTSUP") 918 def test_copystat_handles_harmless_chflags_errors(self): 919 tmpdir = self.mkdtemp() 920 file1 = os.path.join(tmpdir, 'file1') 921 file2 = os.path.join(tmpdir, 'file2') 922 write_file(file1, 'xxx') 923 write_file(file2, 'xxx') 924 925 def make_chflags_raiser(err): 926 ex = OSError() 927 928 def _chflags_raiser(path, flags, *, follow_symlinks=True): 929 ex.errno = err 930 raise ex 931 return _chflags_raiser 932 old_chflags = os.chflags 933 try: 934 for err in errno.EOPNOTSUPP, errno.ENOTSUP: 935 os.chflags = make_chflags_raiser(err) 936 shutil.copystat(file1, file2) 937 # assert others errors break it 938 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP) 939 self.assertRaises(OSError, shutil.copystat, file1, file2) 940 finally: 941 os.chflags = old_chflags 942 943 ### shutil.copyxattr 944 945 @os_helper.skip_unless_xattr 946 def test_copyxattr(self): 947 tmp_dir = self.mkdtemp() 948 src = os.path.join(tmp_dir, 'foo') 949 write_file(src, 'foo') 950 dst = os.path.join(tmp_dir, 'bar') 951 write_file(dst, 'bar') 952 953 # no xattr == no problem 954 shutil._copyxattr(src, dst) 955 # common case 956 os.setxattr(src, 'user.foo', b'42') 957 os.setxattr(src, 'user.bar', b'43') 958 shutil._copyxattr(src, dst) 959 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst))) 960 self.assertEqual( 961 os.getxattr(src, 'user.foo'), 962 os.getxattr(dst, 'user.foo')) 963 # check errors don't affect other attrs 964 os.remove(dst) 965 write_file(dst, 'bar') 966 os_error = OSError(errno.EPERM, 'EPERM') 967 968 def _raise_on_user_foo(fname, attr, val, **kwargs): 969 if attr == 'user.foo': 970 raise os_error 971 else: 972 orig_setxattr(fname, attr, val, **kwargs) 973 try: 974 orig_setxattr = os.setxattr 975 os.setxattr = _raise_on_user_foo 976 shutil._copyxattr(src, dst) 977 self.assertIn('user.bar', os.listxattr(dst)) 978 finally: 979 os.setxattr = orig_setxattr 980 # the source filesystem not supporting xattrs should be ok, too. 981 def _raise_on_src(fname, *, follow_symlinks=True): 982 if fname == src: 983 raise OSError(errno.ENOTSUP, 'Operation not supported') 984 return orig_listxattr(fname, follow_symlinks=follow_symlinks) 985 try: 986 orig_listxattr = os.listxattr 987 os.listxattr = _raise_on_src 988 shutil._copyxattr(src, dst) 989 finally: 990 os.listxattr = orig_listxattr 991 992 # test that shutil.copystat copies xattrs 993 src = os.path.join(tmp_dir, 'the_original') 994 srcro = os.path.join(tmp_dir, 'the_original_ro') 995 write_file(src, src) 996 write_file(srcro, srcro) 997 os.setxattr(src, 'user.the_value', b'fiddly') 998 os.setxattr(srcro, 'user.the_value', b'fiddly') 999 os.chmod(srcro, 0o444) 1000 dst = os.path.join(tmp_dir, 'the_copy') 1001 dstro = os.path.join(tmp_dir, 'the_copy_ro') 1002 write_file(dst, dst) 1003 write_file(dstro, dstro) 1004 shutil.copystat(src, dst) 1005 shutil.copystat(srcro, dstro) 1006 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly') 1007 self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly') 1008 1009 @os_helper.skip_unless_symlink 1010 @os_helper.skip_unless_xattr 1011 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0, 1012 'root privileges required') 1013 def test_copyxattr_symlinks(self): 1014 # On Linux, it's only possible to access non-user xattr for symlinks; 1015 # which in turn require root privileges. This test should be expanded 1016 # as soon as other platforms gain support for extended attributes. 1017 tmp_dir = self.mkdtemp() 1018 src = os.path.join(tmp_dir, 'foo') 1019 src_link = os.path.join(tmp_dir, 'baz') 1020 write_file(src, 'foo') 1021 os.symlink(src, src_link) 1022 os.setxattr(src, 'trusted.foo', b'42') 1023 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False) 1024 dst = os.path.join(tmp_dir, 'bar') 1025 dst_link = os.path.join(tmp_dir, 'qux') 1026 write_file(dst, 'bar') 1027 os.symlink(dst, dst_link) 1028 shutil._copyxattr(src_link, dst_link, follow_symlinks=False) 1029 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43') 1030 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo') 1031 shutil._copyxattr(src_link, dst, follow_symlinks=False) 1032 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43') 1033 1034 ### shutil.copy 1035 1036 def _copy_file(self, method): 1037 fname = 'test.txt' 1038 tmpdir = self.mkdtemp() 1039 write_file((tmpdir, fname), 'xxx') 1040 file1 = os.path.join(tmpdir, fname) 1041 tmpdir2 = self.mkdtemp() 1042 method(file1, tmpdir2) 1043 file2 = os.path.join(tmpdir2, fname) 1044 return (file1, file2) 1045 1046 def test_copy(self): 1047 # Ensure that the copied file exists and has the same mode bits. 1048 file1, file2 = self._copy_file(shutil.copy) 1049 self.assertTrue(os.path.exists(file2)) 1050 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode) 1051 1052 @os_helper.skip_unless_symlink 1053 def test_copy_symlinks(self): 1054 tmp_dir = self.mkdtemp() 1055 src = os.path.join(tmp_dir, 'foo') 1056 dst = os.path.join(tmp_dir, 'bar') 1057 src_link = os.path.join(tmp_dir, 'baz') 1058 write_file(src, 'foo') 1059 os.symlink(src, src_link) 1060 if hasattr(os, 'lchmod'): 1061 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 1062 # don't follow 1063 shutil.copy(src_link, dst, follow_symlinks=True) 1064 self.assertFalse(os.path.islink(dst)) 1065 self.assertEqual(read_file(src), read_file(dst)) 1066 os.remove(dst) 1067 # follow 1068 shutil.copy(src_link, dst, follow_symlinks=False) 1069 self.assertTrue(os.path.islink(dst)) 1070 self.assertEqual(os.readlink(dst), os.readlink(src_link)) 1071 if hasattr(os, 'lchmod'): 1072 self.assertEqual(os.lstat(src_link).st_mode, 1073 os.lstat(dst).st_mode) 1074 1075 ### shutil.copy2 1076 1077 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime') 1078 def test_copy2(self): 1079 # Ensure that the copied file exists and has the same mode and 1080 # modification time bits. 1081 file1, file2 = self._copy_file(shutil.copy2) 1082 self.assertTrue(os.path.exists(file2)) 1083 file1_stat = os.stat(file1) 1084 file2_stat = os.stat(file2) 1085 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode) 1086 for attr in 'st_atime', 'st_mtime': 1087 # The modification times may be truncated in the new file. 1088 self.assertLessEqual(getattr(file1_stat, attr), 1089 getattr(file2_stat, attr) + 1) 1090 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'): 1091 self.assertEqual(getattr(file1_stat, 'st_flags'), 1092 getattr(file2_stat, 'st_flags')) 1093 1094 @os_helper.skip_unless_symlink 1095 def test_copy2_symlinks(self): 1096 tmp_dir = self.mkdtemp() 1097 src = os.path.join(tmp_dir, 'foo') 1098 dst = os.path.join(tmp_dir, 'bar') 1099 src_link = os.path.join(tmp_dir, 'baz') 1100 write_file(src, 'foo') 1101 os.symlink(src, src_link) 1102 if hasattr(os, 'lchmod'): 1103 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 1104 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 1105 os.lchflags(src_link, stat.UF_NODUMP) 1106 src_stat = os.stat(src) 1107 src_link_stat = os.lstat(src_link) 1108 # follow 1109 shutil.copy2(src_link, dst, follow_symlinks=True) 1110 self.assertFalse(os.path.islink(dst)) 1111 self.assertEqual(read_file(src), read_file(dst)) 1112 os.remove(dst) 1113 # don't follow 1114 shutil.copy2(src_link, dst, follow_symlinks=False) 1115 self.assertTrue(os.path.islink(dst)) 1116 self.assertEqual(os.readlink(dst), os.readlink(src_link)) 1117 dst_stat = os.lstat(dst) 1118 if os.utime in os.supports_follow_symlinks: 1119 for attr in 'st_atime', 'st_mtime': 1120 # The modification times may be truncated in the new file. 1121 self.assertLessEqual(getattr(src_link_stat, attr), 1122 getattr(dst_stat, attr) + 1) 1123 if hasattr(os, 'lchmod'): 1124 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode) 1125 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode) 1126 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): 1127 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags) 1128 1129 @os_helper.skip_unless_xattr 1130 def test_copy2_xattr(self): 1131 tmp_dir = self.mkdtemp() 1132 src = os.path.join(tmp_dir, 'foo') 1133 dst = os.path.join(tmp_dir, 'bar') 1134 write_file(src, 'foo') 1135 os.setxattr(src, 'user.foo', b'42') 1136 shutil.copy2(src, dst) 1137 self.assertEqual( 1138 os.getxattr(src, 'user.foo'), 1139 os.getxattr(dst, 'user.foo')) 1140 os.remove(dst) 1141 1142 def test_copy_return_value(self): 1143 # copy and copy2 both return their destination path. 1144 for fn in (shutil.copy, shutil.copy2): 1145 src_dir = self.mkdtemp() 1146 dst_dir = self.mkdtemp() 1147 src = os.path.join(src_dir, 'foo') 1148 write_file(src, 'foo') 1149 rv = fn(src, dst_dir) 1150 self.assertEqual(rv, os.path.join(dst_dir, 'foo')) 1151 rv = fn(src, os.path.join(dst_dir, 'bar')) 1152 self.assertEqual(rv, os.path.join(dst_dir, 'bar')) 1153 1154 def test_copy_dir(self): 1155 self._test_copy_dir(shutil.copy) 1156 1157 def test_copy2_dir(self): 1158 self._test_copy_dir(shutil.copy2) 1159 1160 def _test_copy_dir(self, copy_func): 1161 src_dir = self.mkdtemp() 1162 src_file = os.path.join(src_dir, 'foo') 1163 dir2 = self.mkdtemp() 1164 dst = os.path.join(src_dir, 'does_not_exist/') 1165 write_file(src_file, 'foo') 1166 if sys.platform == "win32": 1167 err = PermissionError 1168 else: 1169 err = IsADirectoryError 1170 self.assertRaises(err, copy_func, dir2, src_dir) 1171 1172 # raise *err* because of src rather than FileNotFoundError because of dst 1173 self.assertRaises(err, copy_func, dir2, dst) 1174 copy_func(src_file, dir2) # should not raise exceptions 1175 1176 ### shutil.copyfile 1177 1178 @os_helper.skip_unless_symlink 1179 def test_copyfile_symlinks(self): 1180 tmp_dir = self.mkdtemp() 1181 src = os.path.join(tmp_dir, 'src') 1182 dst = os.path.join(tmp_dir, 'dst') 1183 dst_link = os.path.join(tmp_dir, 'dst_link') 1184 link = os.path.join(tmp_dir, 'link') 1185 write_file(src, 'foo') 1186 os.symlink(src, link) 1187 # don't follow 1188 shutil.copyfile(link, dst_link, follow_symlinks=False) 1189 self.assertTrue(os.path.islink(dst_link)) 1190 self.assertEqual(os.readlink(link), os.readlink(dst_link)) 1191 # follow 1192 shutil.copyfile(link, dst) 1193 self.assertFalse(os.path.islink(dst)) 1194 1195 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link') 1196 def test_dont_copy_file_onto_link_to_itself(self): 1197 # bug 851123. 1198 os.mkdir(TESTFN) 1199 src = os.path.join(TESTFN, 'cheese') 1200 dst = os.path.join(TESTFN, 'shop') 1201 try: 1202 with open(src, 'w', encoding='utf-8') as f: 1203 f.write('cheddar') 1204 try: 1205 os.link(src, dst) 1206 except PermissionError as e: 1207 self.skipTest('os.link(): %s' % e) 1208 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) 1209 with open(src, 'r', encoding='utf-8') as f: 1210 self.assertEqual(f.read(), 'cheddar') 1211 os.remove(dst) 1212 finally: 1213 shutil.rmtree(TESTFN, ignore_errors=True) 1214 1215 @os_helper.skip_unless_symlink 1216 def test_dont_copy_file_onto_symlink_to_itself(self): 1217 # bug 851123. 1218 os.mkdir(TESTFN) 1219 src = os.path.join(TESTFN, 'cheese') 1220 dst = os.path.join(TESTFN, 'shop') 1221 try: 1222 with open(src, 'w', encoding='utf-8') as f: 1223 f.write('cheddar') 1224 # Using `src` here would mean we end up with a symlink pointing 1225 # to TESTFN/TESTFN/cheese, while it should point at 1226 # TESTFN/cheese. 1227 os.symlink('cheese', dst) 1228 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) 1229 with open(src, 'r', encoding='utf-8') as f: 1230 self.assertEqual(f.read(), 'cheddar') 1231 os.remove(dst) 1232 finally: 1233 shutil.rmtree(TESTFN, ignore_errors=True) 1234 1235 # Issue #3002: copyfile and copytree block indefinitely on named pipes 1236 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') 1237 @unittest.skipIf(sys.platform == "vxworks", 1238 "fifo requires special path on VxWorks") 1239 def test_copyfile_named_pipe(self): 1240 try: 1241 os.mkfifo(TESTFN) 1242 except PermissionError as e: 1243 self.skipTest('os.mkfifo(): %s' % e) 1244 try: 1245 self.assertRaises(shutil.SpecialFileError, 1246 shutil.copyfile, TESTFN, TESTFN2) 1247 self.assertRaises(shutil.SpecialFileError, 1248 shutil.copyfile, __file__, TESTFN) 1249 finally: 1250 os.remove(TESTFN) 1251 1252 def test_copyfile_return_value(self): 1253 # copytree returns its destination path. 1254 src_dir = self.mkdtemp() 1255 dst_dir = self.mkdtemp() 1256 dst_file = os.path.join(dst_dir, 'bar') 1257 src_file = os.path.join(src_dir, 'foo') 1258 write_file(src_file, 'foo') 1259 rv = shutil.copyfile(src_file, dst_file) 1260 self.assertTrue(os.path.exists(rv)) 1261 self.assertEqual(read_file(src_file), read_file(dst_file)) 1262 1263 def test_copyfile_same_file(self): 1264 # copyfile() should raise SameFileError if the source and destination 1265 # are the same. 1266 src_dir = self.mkdtemp() 1267 src_file = os.path.join(src_dir, 'foo') 1268 write_file(src_file, 'foo') 1269 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file) 1270 # But Error should work too, to stay backward compatible. 1271 self.assertRaises(Error, shutil.copyfile, src_file, src_file) 1272 # Make sure file is not corrupted. 1273 self.assertEqual(read_file(src_file), 'foo') 1274 1275 @unittest.skipIf(MACOS or SOLARIS or _winapi, 'On MACOS, Solaris and Windows the errors are not confusing (though different)') 1276 def test_copyfile_nonexistent_dir(self): 1277 # Issue 43219 1278 src_dir = self.mkdtemp() 1279 src_file = os.path.join(src_dir, 'foo') 1280 dst = os.path.join(src_dir, 'does_not_exist/') 1281 write_file(src_file, 'foo') 1282 self.assertRaises(FileNotFoundError, shutil.copyfile, src_file, dst) 1283 1284 def test_copyfile_copy_dir(self): 1285 # Issue 45234 1286 # test copy() and copyfile() raising proper exceptions when src and/or 1287 # dst are directories 1288 src_dir = self.mkdtemp() 1289 src_file = os.path.join(src_dir, 'foo') 1290 dir2 = self.mkdtemp() 1291 dst = os.path.join(src_dir, 'does_not_exist/') 1292 write_file(src_file, 'foo') 1293 if sys.platform == "win32": 1294 err = PermissionError 1295 else: 1296 err = IsADirectoryError 1297 1298 self.assertRaises(err, shutil.copyfile, src_dir, dst) 1299 self.assertRaises(err, shutil.copyfile, src_file, src_dir) 1300 self.assertRaises(err, shutil.copyfile, dir2, src_dir) 1301 1302 1303class TestArchives(BaseTest, unittest.TestCase): 1304 1305 ### shutil.make_archive 1306 1307 @support.requires_zlib() 1308 def test_make_tarball(self): 1309 # creating something to tar 1310 root_dir, base_dir = self._create_files('') 1311 1312 tmpdir2 = self.mkdtemp() 1313 # force shutil to create the directory 1314 os.rmdir(tmpdir2) 1315 # working with relative paths 1316 work_dir = os.path.dirname(tmpdir2) 1317 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive') 1318 1319 with os_helper.change_cwd(work_dir): 1320 base_name = os.path.abspath(rel_base_name) 1321 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.') 1322 1323 # check if the compressed tarball was created 1324 self.assertEqual(tarball, base_name + '.tar.gz') 1325 self.assertTrue(os.path.isfile(tarball)) 1326 self.assertTrue(tarfile.is_tarfile(tarball)) 1327 with tarfile.open(tarball, 'r:gz') as tf: 1328 self.assertCountEqual(tf.getnames(), 1329 ['.', './sub', './sub2', 1330 './file1', './file2', './sub/file3']) 1331 1332 # trying an uncompressed one 1333 with os_helper.change_cwd(work_dir): 1334 tarball = make_archive(rel_base_name, 'tar', root_dir, '.') 1335 self.assertEqual(tarball, base_name + '.tar') 1336 self.assertTrue(os.path.isfile(tarball)) 1337 self.assertTrue(tarfile.is_tarfile(tarball)) 1338 with tarfile.open(tarball, 'r') as tf: 1339 self.assertCountEqual(tf.getnames(), 1340 ['.', './sub', './sub2', 1341 './file1', './file2', './sub/file3']) 1342 1343 def _tarinfo(self, path): 1344 with tarfile.open(path) as tar: 1345 names = tar.getnames() 1346 names.sort() 1347 return tuple(names) 1348 1349 def _create_files(self, base_dir='dist'): 1350 # creating something to tar 1351 root_dir = self.mkdtemp() 1352 dist = os.path.join(root_dir, base_dir) 1353 os.makedirs(dist, exist_ok=True) 1354 write_file((dist, 'file1'), 'xxx') 1355 write_file((dist, 'file2'), 'xxx') 1356 os.mkdir(os.path.join(dist, 'sub')) 1357 write_file((dist, 'sub', 'file3'), 'xxx') 1358 os.mkdir(os.path.join(dist, 'sub2')) 1359 if base_dir: 1360 write_file((root_dir, 'outer'), 'xxx') 1361 return root_dir, base_dir 1362 1363 @support.requires_zlib() 1364 @unittest.skipUnless(shutil.which('tar'), 1365 'Need the tar command to run') 1366 def test_tarfile_vs_tar(self): 1367 root_dir, base_dir = self._create_files() 1368 base_name = os.path.join(self.mkdtemp(), 'archive') 1369 tarball = make_archive(base_name, 'gztar', root_dir, base_dir) 1370 1371 # check if the compressed tarball was created 1372 self.assertEqual(tarball, base_name + '.tar.gz') 1373 self.assertTrue(os.path.isfile(tarball)) 1374 1375 # now create another tarball using `tar` 1376 tarball2 = os.path.join(root_dir, 'archive2.tar') 1377 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir] 1378 subprocess.check_call(tar_cmd, cwd=root_dir, 1379 stdout=subprocess.DEVNULL) 1380 1381 self.assertTrue(os.path.isfile(tarball2)) 1382 # let's compare both tarballs 1383 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2)) 1384 1385 # trying an uncompressed one 1386 tarball = make_archive(base_name, 'tar', root_dir, base_dir) 1387 self.assertEqual(tarball, base_name + '.tar') 1388 self.assertTrue(os.path.isfile(tarball)) 1389 1390 # now for a dry_run 1391 tarball = make_archive(base_name, 'tar', root_dir, base_dir, 1392 dry_run=True) 1393 self.assertEqual(tarball, base_name + '.tar') 1394 self.assertTrue(os.path.isfile(tarball)) 1395 1396 @support.requires_zlib() 1397 def test_make_zipfile(self): 1398 # creating something to zip 1399 root_dir, base_dir = self._create_files() 1400 1401 tmpdir2 = self.mkdtemp() 1402 # force shutil to create the directory 1403 os.rmdir(tmpdir2) 1404 # working with relative paths 1405 work_dir = os.path.dirname(tmpdir2) 1406 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive') 1407 1408 with os_helper.change_cwd(work_dir): 1409 base_name = os.path.abspath(rel_base_name) 1410 res = make_archive(rel_base_name, 'zip', root_dir) 1411 1412 self.assertEqual(res, base_name + '.zip') 1413 self.assertTrue(os.path.isfile(res)) 1414 self.assertTrue(zipfile.is_zipfile(res)) 1415 with zipfile.ZipFile(res) as zf: 1416 self.assertCountEqual(zf.namelist(), 1417 ['dist/', 'dist/sub/', 'dist/sub2/', 1418 'dist/file1', 'dist/file2', 'dist/sub/file3', 1419 'outer']) 1420 1421 with os_helper.change_cwd(work_dir): 1422 base_name = os.path.abspath(rel_base_name) 1423 res = make_archive(rel_base_name, 'zip', root_dir, base_dir) 1424 1425 self.assertEqual(res, base_name + '.zip') 1426 self.assertTrue(os.path.isfile(res)) 1427 self.assertTrue(zipfile.is_zipfile(res)) 1428 with zipfile.ZipFile(res) as zf: 1429 self.assertCountEqual(zf.namelist(), 1430 ['dist/', 'dist/sub/', 'dist/sub2/', 1431 'dist/file1', 'dist/file2', 'dist/sub/file3']) 1432 1433 @support.requires_zlib() 1434 @unittest.skipUnless(shutil.which('zip'), 1435 'Need the zip command to run') 1436 def test_zipfile_vs_zip(self): 1437 root_dir, base_dir = self._create_files() 1438 base_name = os.path.join(self.mkdtemp(), 'archive') 1439 archive = make_archive(base_name, 'zip', root_dir, base_dir) 1440 1441 # check if ZIP file was created 1442 self.assertEqual(archive, base_name + '.zip') 1443 self.assertTrue(os.path.isfile(archive)) 1444 1445 # now create another ZIP file using `zip` 1446 archive2 = os.path.join(root_dir, 'archive2.zip') 1447 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir] 1448 subprocess.check_call(zip_cmd, cwd=root_dir, 1449 stdout=subprocess.DEVNULL) 1450 1451 self.assertTrue(os.path.isfile(archive2)) 1452 # let's compare both ZIP files 1453 with zipfile.ZipFile(archive) as zf: 1454 names = zf.namelist() 1455 with zipfile.ZipFile(archive2) as zf: 1456 names2 = zf.namelist() 1457 self.assertEqual(sorted(names), sorted(names2)) 1458 1459 @support.requires_zlib() 1460 @unittest.skipUnless(shutil.which('unzip'), 1461 'Need the unzip command to run') 1462 def test_unzip_zipfile(self): 1463 root_dir, base_dir = self._create_files() 1464 base_name = os.path.join(self.mkdtemp(), 'archive') 1465 archive = make_archive(base_name, 'zip', root_dir, base_dir) 1466 1467 # check if ZIP file was created 1468 self.assertEqual(archive, base_name + '.zip') 1469 self.assertTrue(os.path.isfile(archive)) 1470 1471 # now check the ZIP file using `unzip -t` 1472 zip_cmd = ['unzip', '-t', archive] 1473 with os_helper.change_cwd(root_dir): 1474 try: 1475 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT) 1476 except subprocess.CalledProcessError as exc: 1477 details = exc.output.decode(errors="replace") 1478 if 'unrecognized option: t' in details: 1479 self.skipTest("unzip doesn't support -t") 1480 msg = "{}\n\n**Unzip Output**\n{}" 1481 self.fail(msg.format(exc, details)) 1482 1483 def test_make_archive(self): 1484 tmpdir = self.mkdtemp() 1485 base_name = os.path.join(tmpdir, 'archive') 1486 self.assertRaises(ValueError, make_archive, base_name, 'xxx') 1487 1488 @support.requires_zlib() 1489 def test_make_archive_owner_group(self): 1490 # testing make_archive with owner and group, with various combinations 1491 # this works even if there's not gid/uid support 1492 if UID_GID_SUPPORT: 1493 group = grp.getgrgid(0)[0] 1494 owner = pwd.getpwuid(0)[0] 1495 else: 1496 group = owner = 'root' 1497 1498 root_dir, base_dir = self._create_files() 1499 base_name = os.path.join(self.mkdtemp(), 'archive') 1500 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner, 1501 group=group) 1502 self.assertTrue(os.path.isfile(res)) 1503 1504 res = make_archive(base_name, 'zip', root_dir, base_dir) 1505 self.assertTrue(os.path.isfile(res)) 1506 1507 res = make_archive(base_name, 'tar', root_dir, base_dir, 1508 owner=owner, group=group) 1509 self.assertTrue(os.path.isfile(res)) 1510 1511 res = make_archive(base_name, 'tar', root_dir, base_dir, 1512 owner='kjhkjhkjg', group='oihohoh') 1513 self.assertTrue(os.path.isfile(res)) 1514 1515 1516 @support.requires_zlib() 1517 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") 1518 def test_tarfile_root_owner(self): 1519 root_dir, base_dir = self._create_files() 1520 base_name = os.path.join(self.mkdtemp(), 'archive') 1521 group = grp.getgrgid(0)[0] 1522 owner = pwd.getpwuid(0)[0] 1523 with os_helper.change_cwd(root_dir): 1524 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist', 1525 owner=owner, group=group) 1526 1527 # check if the compressed tarball was created 1528 self.assertTrue(os.path.isfile(archive_name)) 1529 1530 # now checks the rights 1531 archive = tarfile.open(archive_name) 1532 try: 1533 for member in archive.getmembers(): 1534 self.assertEqual(member.uid, 0) 1535 self.assertEqual(member.gid, 0) 1536 finally: 1537 archive.close() 1538 1539 def test_make_archive_cwd(self): 1540 current_dir = os.getcwd() 1541 def _breaks(*args, **kw): 1542 raise RuntimeError() 1543 1544 register_archive_format('xxx', _breaks, [], 'xxx file') 1545 try: 1546 try: 1547 make_archive('xxx', 'xxx', root_dir=self.mkdtemp()) 1548 except Exception: 1549 pass 1550 self.assertEqual(os.getcwd(), current_dir) 1551 finally: 1552 unregister_archive_format('xxx') 1553 1554 def test_make_tarfile_in_curdir(self): 1555 # Issue #21280 1556 root_dir = self.mkdtemp() 1557 with os_helper.change_cwd(root_dir): 1558 self.assertEqual(make_archive('test', 'tar'), 'test.tar') 1559 self.assertTrue(os.path.isfile('test.tar')) 1560 1561 @support.requires_zlib() 1562 def test_make_zipfile_in_curdir(self): 1563 # Issue #21280 1564 root_dir = self.mkdtemp() 1565 with os_helper.change_cwd(root_dir): 1566 self.assertEqual(make_archive('test', 'zip'), 'test.zip') 1567 self.assertTrue(os.path.isfile('test.zip')) 1568 1569 def test_register_archive_format(self): 1570 1571 self.assertRaises(TypeError, register_archive_format, 'xxx', 1) 1572 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, 1573 1) 1574 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, 1575 [(1, 2), (1, 2, 3)]) 1576 1577 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file') 1578 formats = [name for name, params in get_archive_formats()] 1579 self.assertIn('xxx', formats) 1580 1581 unregister_archive_format('xxx') 1582 formats = [name for name, params in get_archive_formats()] 1583 self.assertNotIn('xxx', formats) 1584 1585 ### shutil.unpack_archive 1586 1587 def check_unpack_archive(self, format): 1588 self.check_unpack_archive_with_converter(format, lambda path: path) 1589 self.check_unpack_archive_with_converter(format, pathlib.Path) 1590 self.check_unpack_archive_with_converter(format, FakePath) 1591 1592 def check_unpack_archive_with_converter(self, format, converter): 1593 root_dir, base_dir = self._create_files() 1594 expected = rlistdir(root_dir) 1595 expected.remove('outer') 1596 1597 base_name = os.path.join(self.mkdtemp(), 'archive') 1598 filename = make_archive(base_name, format, root_dir, base_dir) 1599 1600 # let's try to unpack it now 1601 tmpdir2 = self.mkdtemp() 1602 unpack_archive(converter(filename), converter(tmpdir2)) 1603 self.assertEqual(rlistdir(tmpdir2), expected) 1604 1605 # and again, this time with the format specified 1606 tmpdir3 = self.mkdtemp() 1607 unpack_archive(converter(filename), converter(tmpdir3), format=format) 1608 self.assertEqual(rlistdir(tmpdir3), expected) 1609 1610 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN)) 1611 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx') 1612 1613 def test_unpack_archive_tar(self): 1614 self.check_unpack_archive('tar') 1615 1616 @support.requires_zlib() 1617 def test_unpack_archive_gztar(self): 1618 self.check_unpack_archive('gztar') 1619 1620 @support.requires_bz2() 1621 def test_unpack_archive_bztar(self): 1622 self.check_unpack_archive('bztar') 1623 1624 @support.requires_lzma() 1625 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger") 1626 def test_unpack_archive_xztar(self): 1627 self.check_unpack_archive('xztar') 1628 1629 @support.requires_zlib() 1630 def test_unpack_archive_zip(self): 1631 self.check_unpack_archive('zip') 1632 1633 def test_unpack_registry(self): 1634 1635 formats = get_unpack_formats() 1636 1637 def _boo(filename, extract_dir, extra): 1638 self.assertEqual(extra, 1) 1639 self.assertEqual(filename, 'stuff.boo') 1640 self.assertEqual(extract_dir, 'xx') 1641 1642 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)]) 1643 unpack_archive('stuff.boo', 'xx') 1644 1645 # trying to register a .boo unpacker again 1646 self.assertRaises(RegistryError, register_unpack_format, 'Boo2', 1647 ['.boo'], _boo) 1648 1649 # should work now 1650 unregister_unpack_format('Boo') 1651 register_unpack_format('Boo2', ['.boo'], _boo) 1652 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats()) 1653 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats()) 1654 1655 # let's leave a clean state 1656 unregister_unpack_format('Boo2') 1657 self.assertEqual(get_unpack_formats(), formats) 1658 1659 1660class TestMisc(BaseTest, unittest.TestCase): 1661 1662 @unittest.skipUnless(hasattr(shutil, 'disk_usage'), 1663 "disk_usage not available on this platform") 1664 def test_disk_usage(self): 1665 usage = shutil.disk_usage(os.path.dirname(__file__)) 1666 for attr in ('total', 'used', 'free'): 1667 self.assertIsInstance(getattr(usage, attr), int) 1668 self.assertGreater(usage.total, 0) 1669 self.assertGreater(usage.used, 0) 1670 self.assertGreaterEqual(usage.free, 0) 1671 self.assertGreaterEqual(usage.total, usage.used) 1672 self.assertGreater(usage.total, usage.free) 1673 1674 # bpo-32557: Check that disk_usage() also accepts a filename 1675 shutil.disk_usage(__file__) 1676 1677 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") 1678 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown') 1679 def test_chown(self): 1680 dirname = self.mkdtemp() 1681 filename = tempfile.mktemp(dir=dirname) 1682 write_file(filename, 'testing chown function') 1683 1684 with self.assertRaises(ValueError): 1685 shutil.chown(filename) 1686 1687 with self.assertRaises(LookupError): 1688 shutil.chown(filename, user='non-existing username') 1689 1690 with self.assertRaises(LookupError): 1691 shutil.chown(filename, group='non-existing groupname') 1692 1693 with self.assertRaises(TypeError): 1694 shutil.chown(filename, b'spam') 1695 1696 with self.assertRaises(TypeError): 1697 shutil.chown(filename, 3.14) 1698 1699 uid = os.getuid() 1700 gid = os.getgid() 1701 1702 def check_chown(path, uid=None, gid=None): 1703 s = os.stat(filename) 1704 if uid is not None: 1705 self.assertEqual(uid, s.st_uid) 1706 if gid is not None: 1707 self.assertEqual(gid, s.st_gid) 1708 1709 shutil.chown(filename, uid, gid) 1710 check_chown(filename, uid, gid) 1711 shutil.chown(filename, uid) 1712 check_chown(filename, uid) 1713 shutil.chown(filename, user=uid) 1714 check_chown(filename, uid) 1715 shutil.chown(filename, group=gid) 1716 check_chown(filename, gid=gid) 1717 1718 shutil.chown(dirname, uid, gid) 1719 check_chown(dirname, uid, gid) 1720 shutil.chown(dirname, uid) 1721 check_chown(dirname, uid) 1722 shutil.chown(dirname, user=uid) 1723 check_chown(dirname, uid) 1724 shutil.chown(dirname, group=gid) 1725 check_chown(dirname, gid=gid) 1726 1727 try: 1728 user = pwd.getpwuid(uid)[0] 1729 group = grp.getgrgid(gid)[0] 1730 except KeyError: 1731 # On some systems uid/gid cannot be resolved. 1732 pass 1733 else: 1734 shutil.chown(filename, user, group) 1735 check_chown(filename, uid, gid) 1736 shutil.chown(dirname, user, group) 1737 check_chown(dirname, uid, gid) 1738 1739 1740class TestWhich(BaseTest, unittest.TestCase): 1741 1742 def setUp(self): 1743 self.temp_dir = self.mkdtemp(prefix="Tmp") 1744 # Give the temp_file an ".exe" suffix for all. 1745 # It's needed on Windows and not harmful on other platforms. 1746 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir, 1747 prefix="Tmp", 1748 suffix=".Exe") 1749 os.chmod(self.temp_file.name, stat.S_IXUSR) 1750 self.addCleanup(self.temp_file.close) 1751 self.dir, self.file = os.path.split(self.temp_file.name) 1752 self.env_path = self.dir 1753 self.curdir = os.curdir 1754 self.ext = ".EXE" 1755 1756 def test_basic(self): 1757 # Given an EXE in a directory, it should be returned. 1758 rv = shutil.which(self.file, path=self.dir) 1759 self.assertEqual(rv, self.temp_file.name) 1760 1761 def test_absolute_cmd(self): 1762 # When given the fully qualified path to an executable that exists, 1763 # it should be returned. 1764 rv = shutil.which(self.temp_file.name, path=self.temp_dir) 1765 self.assertEqual(rv, self.temp_file.name) 1766 1767 def test_relative_cmd(self): 1768 # When given the relative path with a directory part to an executable 1769 # that exists, it should be returned. 1770 base_dir, tail_dir = os.path.split(self.dir) 1771 relpath = os.path.join(tail_dir, self.file) 1772 with os_helper.change_cwd(path=base_dir): 1773 rv = shutil.which(relpath, path=self.temp_dir) 1774 self.assertEqual(rv, relpath) 1775 # But it shouldn't be searched in PATH directories (issue #16957). 1776 with os_helper.change_cwd(path=self.dir): 1777 rv = shutil.which(relpath, path=base_dir) 1778 self.assertIsNone(rv) 1779 1780 def test_cwd(self): 1781 # Issue #16957 1782 base_dir = os.path.dirname(self.dir) 1783 with os_helper.change_cwd(path=self.dir): 1784 rv = shutil.which(self.file, path=base_dir) 1785 if sys.platform == "win32": 1786 # Windows: current directory implicitly on PATH 1787 self.assertEqual(rv, os.path.join(self.curdir, self.file)) 1788 else: 1789 # Other platforms: shouldn't match in the current directory. 1790 self.assertIsNone(rv) 1791 1792 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 1793 'non-root user required') 1794 def test_non_matching_mode(self): 1795 # Set the file read-only and ask for writeable files. 1796 os.chmod(self.temp_file.name, stat.S_IREAD) 1797 if os.access(self.temp_file.name, os.W_OK): 1798 self.skipTest("can't set the file read-only") 1799 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK) 1800 self.assertIsNone(rv) 1801 1802 def test_relative_path(self): 1803 base_dir, tail_dir = os.path.split(self.dir) 1804 with os_helper.change_cwd(path=base_dir): 1805 rv = shutil.which(self.file, path=tail_dir) 1806 self.assertEqual(rv, os.path.join(tail_dir, self.file)) 1807 1808 def test_nonexistent_file(self): 1809 # Return None when no matching executable file is found on the path. 1810 rv = shutil.which("foo.exe", path=self.dir) 1811 self.assertIsNone(rv) 1812 1813 @unittest.skipUnless(sys.platform == "win32", 1814 "pathext check is Windows-only") 1815 def test_pathext_checking(self): 1816 # Ask for the file without the ".exe" extension, then ensure that 1817 # it gets found properly with the extension. 1818 rv = shutil.which(self.file[:-4], path=self.dir) 1819 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext) 1820 1821 def test_environ_path(self): 1822 with os_helper.EnvironmentVarGuard() as env: 1823 env['PATH'] = self.env_path 1824 rv = shutil.which(self.file) 1825 self.assertEqual(rv, self.temp_file.name) 1826 1827 def test_environ_path_empty(self): 1828 # PATH='': no match 1829 with os_helper.EnvironmentVarGuard() as env: 1830 env['PATH'] = '' 1831 with unittest.mock.patch('os.confstr', return_value=self.dir, \ 1832 create=True), \ 1833 support.swap_attr(os, 'defpath', self.dir), \ 1834 os_helper.change_cwd(self.dir): 1835 rv = shutil.which(self.file) 1836 self.assertIsNone(rv) 1837 1838 def test_environ_path_cwd(self): 1839 expected_cwd = os.path.basename(self.temp_file.name) 1840 if sys.platform == "win32": 1841 curdir = os.curdir 1842 if isinstance(expected_cwd, bytes): 1843 curdir = os.fsencode(curdir) 1844 expected_cwd = os.path.join(curdir, expected_cwd) 1845 1846 # PATH=':': explicitly looks in the current directory 1847 with os_helper.EnvironmentVarGuard() as env: 1848 env['PATH'] = os.pathsep 1849 with unittest.mock.patch('os.confstr', return_value=self.dir, \ 1850 create=True), \ 1851 support.swap_attr(os, 'defpath', self.dir): 1852 rv = shutil.which(self.file) 1853 self.assertIsNone(rv) 1854 1855 # look in current directory 1856 with os_helper.change_cwd(self.dir): 1857 rv = shutil.which(self.file) 1858 self.assertEqual(rv, expected_cwd) 1859 1860 def test_environ_path_missing(self): 1861 with os_helper.EnvironmentVarGuard() as env: 1862 env.pop('PATH', None) 1863 1864 # without confstr 1865 with unittest.mock.patch('os.confstr', side_effect=ValueError, \ 1866 create=True), \ 1867 support.swap_attr(os, 'defpath', self.dir): 1868 rv = shutil.which(self.file) 1869 self.assertEqual(rv, self.temp_file.name) 1870 1871 # with confstr 1872 with unittest.mock.patch('os.confstr', return_value=self.dir, \ 1873 create=True), \ 1874 support.swap_attr(os, 'defpath', ''): 1875 rv = shutil.which(self.file) 1876 self.assertEqual(rv, self.temp_file.name) 1877 1878 def test_empty_path(self): 1879 base_dir = os.path.dirname(self.dir) 1880 with os_helper.change_cwd(path=self.dir), \ 1881 os_helper.EnvironmentVarGuard() as env: 1882 env['PATH'] = self.env_path 1883 rv = shutil.which(self.file, path='') 1884 self.assertIsNone(rv) 1885 1886 def test_empty_path_no_PATH(self): 1887 with os_helper.EnvironmentVarGuard() as env: 1888 env.pop('PATH', None) 1889 rv = shutil.which(self.file) 1890 self.assertIsNone(rv) 1891 1892 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows') 1893 def test_pathext(self): 1894 ext = ".xyz" 1895 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir, 1896 prefix="Tmp2", suffix=ext) 1897 os.chmod(temp_filexyz.name, stat.S_IXUSR) 1898 self.addCleanup(temp_filexyz.close) 1899 1900 # strip path and extension 1901 program = os.path.basename(temp_filexyz.name) 1902 program = os.path.splitext(program)[0] 1903 1904 with os_helper.EnvironmentVarGuard() as env: 1905 env['PATHEXT'] = ext 1906 rv = shutil.which(program, path=self.temp_dir) 1907 self.assertEqual(rv, temp_filexyz.name) 1908 1909 # Issue 40592: See https://bugs.python.org/issue40592 1910 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows') 1911 def test_pathext_with_empty_str(self): 1912 ext = ".xyz" 1913 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir, 1914 prefix="Tmp2", suffix=ext) 1915 self.addCleanup(temp_filexyz.close) 1916 1917 # strip path and extension 1918 program = os.path.basename(temp_filexyz.name) 1919 program = os.path.splitext(program)[0] 1920 1921 with os_helper.EnvironmentVarGuard() as env: 1922 env['PATHEXT'] = f"{ext};" # note the ; 1923 rv = shutil.which(program, path=self.temp_dir) 1924 self.assertEqual(rv, temp_filexyz.name) 1925 1926 1927class TestWhichBytes(TestWhich): 1928 def setUp(self): 1929 TestWhich.setUp(self) 1930 self.dir = os.fsencode(self.dir) 1931 self.file = os.fsencode(self.file) 1932 self.temp_file.name = os.fsencode(self.temp_file.name) 1933 self.curdir = os.fsencode(self.curdir) 1934 self.ext = os.fsencode(self.ext) 1935 1936 1937class TestMove(BaseTest, unittest.TestCase): 1938 1939 def setUp(self): 1940 filename = "foo" 1941 self.src_dir = self.mkdtemp() 1942 self.dst_dir = self.mkdtemp() 1943 self.src_file = os.path.join(self.src_dir, filename) 1944 self.dst_file = os.path.join(self.dst_dir, filename) 1945 with open(self.src_file, "wb") as f: 1946 f.write(b"spam") 1947 1948 def _check_move_file(self, src, dst, real_dst): 1949 with open(src, "rb") as f: 1950 contents = f.read() 1951 shutil.move(src, dst) 1952 with open(real_dst, "rb") as f: 1953 self.assertEqual(contents, f.read()) 1954 self.assertFalse(os.path.exists(src)) 1955 1956 def _check_move_dir(self, src, dst, real_dst): 1957 contents = sorted(os.listdir(src)) 1958 shutil.move(src, dst) 1959 self.assertEqual(contents, sorted(os.listdir(real_dst))) 1960 self.assertFalse(os.path.exists(src)) 1961 1962 def test_move_file(self): 1963 # Move a file to another location on the same filesystem. 1964 self._check_move_file(self.src_file, self.dst_file, self.dst_file) 1965 1966 def test_move_file_to_dir(self): 1967 # Move a file inside an existing dir on the same filesystem. 1968 self._check_move_file(self.src_file, self.dst_dir, self.dst_file) 1969 1970 def test_move_file_to_dir_pathlike_src(self): 1971 # Move a pathlike file to another location on the same filesystem. 1972 src = pathlib.Path(self.src_file) 1973 self._check_move_file(src, self.dst_dir, self.dst_file) 1974 1975 def test_move_file_to_dir_pathlike_dst(self): 1976 # Move a file to another pathlike location on the same filesystem. 1977 dst = pathlib.Path(self.dst_dir) 1978 self._check_move_file(self.src_file, dst, self.dst_file) 1979 1980 @mock_rename 1981 def test_move_file_other_fs(self): 1982 # Move a file to an existing dir on another filesystem. 1983 self.test_move_file() 1984 1985 @mock_rename 1986 def test_move_file_to_dir_other_fs(self): 1987 # Move a file to another location on another filesystem. 1988 self.test_move_file_to_dir() 1989 1990 def test_move_dir(self): 1991 # Move a dir to another location on the same filesystem. 1992 dst_dir = tempfile.mktemp(dir=self.mkdtemp()) 1993 try: 1994 self._check_move_dir(self.src_dir, dst_dir, dst_dir) 1995 finally: 1996 os_helper.rmtree(dst_dir) 1997 1998 @mock_rename 1999 def test_move_dir_other_fs(self): 2000 # Move a dir to another location on another filesystem. 2001 self.test_move_dir() 2002 2003 def test_move_dir_to_dir(self): 2004 # Move a dir inside an existing dir on the same filesystem. 2005 self._check_move_dir(self.src_dir, self.dst_dir, 2006 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 2007 2008 @mock_rename 2009 def test_move_dir_to_dir_other_fs(self): 2010 # Move a dir inside an existing dir on another filesystem. 2011 self.test_move_dir_to_dir() 2012 2013 def test_move_dir_sep_to_dir(self): 2014 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir, 2015 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 2016 2017 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep') 2018 def test_move_dir_altsep_to_dir(self): 2019 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir, 2020 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 2021 2022 def test_existing_file_inside_dest_dir(self): 2023 # A file with the same name inside the destination dir already exists. 2024 with open(self.dst_file, "wb"): 2025 pass 2026 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir) 2027 2028 def test_dont_move_dir_in_itself(self): 2029 # Moving a dir inside itself raises an Error. 2030 dst = os.path.join(self.src_dir, "bar") 2031 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst) 2032 2033 def test_destinsrc_false_negative(self): 2034 os.mkdir(TESTFN) 2035 try: 2036 for src, dst in [('srcdir', 'srcdir/dest')]: 2037 src = os.path.join(TESTFN, src) 2038 dst = os.path.join(TESTFN, dst) 2039 self.assertTrue(shutil._destinsrc(src, dst), 2040 msg='_destinsrc() wrongly concluded that ' 2041 'dst (%s) is not in src (%s)' % (dst, src)) 2042 finally: 2043 os_helper.rmtree(TESTFN) 2044 2045 def test_destinsrc_false_positive(self): 2046 os.mkdir(TESTFN) 2047 try: 2048 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]: 2049 src = os.path.join(TESTFN, src) 2050 dst = os.path.join(TESTFN, dst) 2051 self.assertFalse(shutil._destinsrc(src, dst), 2052 msg='_destinsrc() wrongly concluded that ' 2053 'dst (%s) is in src (%s)' % (dst, src)) 2054 finally: 2055 os_helper.rmtree(TESTFN) 2056 2057 @os_helper.skip_unless_symlink 2058 @mock_rename 2059 def test_move_file_symlink(self): 2060 dst = os.path.join(self.src_dir, 'bar') 2061 os.symlink(self.src_file, dst) 2062 shutil.move(dst, self.dst_file) 2063 self.assertTrue(os.path.islink(self.dst_file)) 2064 self.assertTrue(os.path.samefile(self.src_file, self.dst_file)) 2065 2066 @os_helper.skip_unless_symlink 2067 @mock_rename 2068 def test_move_file_symlink_to_dir(self): 2069 filename = "bar" 2070 dst = os.path.join(self.src_dir, filename) 2071 os.symlink(self.src_file, dst) 2072 shutil.move(dst, self.dst_dir) 2073 final_link = os.path.join(self.dst_dir, filename) 2074 self.assertTrue(os.path.islink(final_link)) 2075 self.assertTrue(os.path.samefile(self.src_file, final_link)) 2076 2077 @os_helper.skip_unless_symlink 2078 @mock_rename 2079 def test_move_dangling_symlink(self): 2080 src = os.path.join(self.src_dir, 'baz') 2081 dst = os.path.join(self.src_dir, 'bar') 2082 os.symlink(src, dst) 2083 dst_link = os.path.join(self.dst_dir, 'quux') 2084 shutil.move(dst, dst_link) 2085 self.assertTrue(os.path.islink(dst_link)) 2086 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link)) 2087 2088 @os_helper.skip_unless_symlink 2089 @mock_rename 2090 def test_move_dir_symlink(self): 2091 src = os.path.join(self.src_dir, 'baz') 2092 dst = os.path.join(self.src_dir, 'bar') 2093 os.mkdir(src) 2094 os.symlink(src, dst) 2095 dst_link = os.path.join(self.dst_dir, 'quux') 2096 shutil.move(dst, dst_link) 2097 self.assertTrue(os.path.islink(dst_link)) 2098 self.assertTrue(os.path.samefile(src, dst_link)) 2099 2100 def test_move_return_value(self): 2101 rv = shutil.move(self.src_file, self.dst_dir) 2102 self.assertEqual(rv, 2103 os.path.join(self.dst_dir, os.path.basename(self.src_file))) 2104 2105 def test_move_as_rename_return_value(self): 2106 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar')) 2107 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar')) 2108 2109 @mock_rename 2110 def test_move_file_special_function(self): 2111 moved = [] 2112 def _copy(src, dst): 2113 moved.append((src, dst)) 2114 shutil.move(self.src_file, self.dst_dir, copy_function=_copy) 2115 self.assertEqual(len(moved), 1) 2116 2117 @mock_rename 2118 def test_move_dir_special_function(self): 2119 moved = [] 2120 def _copy(src, dst): 2121 moved.append((src, dst)) 2122 os_helper.create_empty_file(os.path.join(self.src_dir, 'child')) 2123 os_helper.create_empty_file(os.path.join(self.src_dir, 'child1')) 2124 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy) 2125 self.assertEqual(len(moved), 3) 2126 2127 def test_move_dir_caseinsensitive(self): 2128 # Renames a folder to the same name 2129 # but a different case. 2130 2131 self.src_dir = self.mkdtemp() 2132 dst_dir = os.path.join( 2133 os.path.dirname(self.src_dir), 2134 os.path.basename(self.src_dir).upper()) 2135 self.assertNotEqual(self.src_dir, dst_dir) 2136 2137 try: 2138 shutil.move(self.src_dir, dst_dir) 2139 self.assertTrue(os.path.isdir(dst_dir)) 2140 finally: 2141 os.rmdir(dst_dir) 2142 2143 2144 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0 2145 and hasattr(os, 'lchflags') 2146 and hasattr(stat, 'SF_IMMUTABLE') 2147 and hasattr(stat, 'UF_OPAQUE'), 2148 'root privileges required') 2149 def test_move_dir_permission_denied(self): 2150 # bpo-42782: shutil.move should not create destination directories 2151 # if the source directory cannot be removed. 2152 try: 2153 os.mkdir(TESTFN_SRC) 2154 os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE) 2155 2156 # Testing on an empty immutable directory 2157 # TESTFN_DST should not exist if shutil.move failed 2158 self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST) 2159 self.assertFalse(TESTFN_DST in os.listdir()) 2160 2161 # Create a file and keep the directory immutable 2162 os.lchflags(TESTFN_SRC, stat.UF_OPAQUE) 2163 os_helper.create_empty_file(os.path.join(TESTFN_SRC, 'child')) 2164 os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE) 2165 2166 # Testing on a non-empty immutable directory 2167 # TESTFN_DST should not exist if shutil.move failed 2168 self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST) 2169 self.assertFalse(TESTFN_DST in os.listdir()) 2170 finally: 2171 if os.path.exists(TESTFN_SRC): 2172 os.lchflags(TESTFN_SRC, stat.UF_OPAQUE) 2173 os_helper.rmtree(TESTFN_SRC) 2174 if os.path.exists(TESTFN_DST): 2175 os.lchflags(TESTFN_DST, stat.UF_OPAQUE) 2176 os_helper.rmtree(TESTFN_DST) 2177 2178 2179class TestCopyFile(unittest.TestCase): 2180 2181 class Faux(object): 2182 _entered = False 2183 _exited_with = None 2184 _raised = False 2185 def __init__(self, raise_in_exit=False, suppress_at_exit=True): 2186 self._raise_in_exit = raise_in_exit 2187 self._suppress_at_exit = suppress_at_exit 2188 def read(self, *args): 2189 return '' 2190 def __enter__(self): 2191 self._entered = True 2192 def __exit__(self, exc_type, exc_val, exc_tb): 2193 self._exited_with = exc_type, exc_val, exc_tb 2194 if self._raise_in_exit: 2195 self._raised = True 2196 raise OSError("Cannot close") 2197 return self._suppress_at_exit 2198 2199 def test_w_source_open_fails(self): 2200 def _open(filename, mode='r'): 2201 if filename == 'srcfile': 2202 raise OSError('Cannot open "srcfile"') 2203 assert 0 # shouldn't reach here. 2204 2205 with support.swap_attr(shutil, 'open', _open): 2206 with self.assertRaises(OSError): 2207 shutil.copyfile('srcfile', 'destfile') 2208 2209 @unittest.skipIf(MACOS, "skipped on macOS") 2210 def test_w_dest_open_fails(self): 2211 srcfile = self.Faux() 2212 2213 def _open(filename, mode='r'): 2214 if filename == 'srcfile': 2215 return srcfile 2216 if filename == 'destfile': 2217 raise OSError('Cannot open "destfile"') 2218 assert 0 # shouldn't reach here. 2219 2220 with support.swap_attr(shutil, 'open', _open): 2221 shutil.copyfile('srcfile', 'destfile') 2222 self.assertTrue(srcfile._entered) 2223 self.assertTrue(srcfile._exited_with[0] is OSError) 2224 self.assertEqual(srcfile._exited_with[1].args, 2225 ('Cannot open "destfile"',)) 2226 2227 @unittest.skipIf(MACOS, "skipped on macOS") 2228 def test_w_dest_close_fails(self): 2229 srcfile = self.Faux() 2230 destfile = self.Faux(True) 2231 2232 def _open(filename, mode='r'): 2233 if filename == 'srcfile': 2234 return srcfile 2235 if filename == 'destfile': 2236 return destfile 2237 assert 0 # shouldn't reach here. 2238 2239 with support.swap_attr(shutil, 'open', _open): 2240 shutil.copyfile('srcfile', 'destfile') 2241 self.assertTrue(srcfile._entered) 2242 self.assertTrue(destfile._entered) 2243 self.assertTrue(destfile._raised) 2244 self.assertTrue(srcfile._exited_with[0] is OSError) 2245 self.assertEqual(srcfile._exited_with[1].args, 2246 ('Cannot close',)) 2247 2248 @unittest.skipIf(MACOS, "skipped on macOS") 2249 def test_w_source_close_fails(self): 2250 2251 srcfile = self.Faux(True) 2252 destfile = self.Faux() 2253 2254 def _open(filename, mode='r'): 2255 if filename == 'srcfile': 2256 return srcfile 2257 if filename == 'destfile': 2258 return destfile 2259 assert 0 # shouldn't reach here. 2260 2261 with support.swap_attr(shutil, 'open', _open): 2262 with self.assertRaises(OSError): 2263 shutil.copyfile('srcfile', 'destfile') 2264 self.assertTrue(srcfile._entered) 2265 self.assertTrue(destfile._entered) 2266 self.assertFalse(destfile._raised) 2267 self.assertTrue(srcfile._exited_with[0] is None) 2268 self.assertTrue(srcfile._raised) 2269 2270 2271class TestCopyFileObj(unittest.TestCase): 2272 FILESIZE = 2 * 1024 * 1024 2273 2274 @classmethod 2275 def setUpClass(cls): 2276 write_test_file(TESTFN, cls.FILESIZE) 2277 2278 @classmethod 2279 def tearDownClass(cls): 2280 os_helper.unlink(TESTFN) 2281 os_helper.unlink(TESTFN2) 2282 2283 def tearDown(self): 2284 os_helper.unlink(TESTFN2) 2285 2286 @contextlib.contextmanager 2287 def get_files(self): 2288 with open(TESTFN, "rb") as src: 2289 with open(TESTFN2, "wb") as dst: 2290 yield (src, dst) 2291 2292 def assert_files_eq(self, src, dst): 2293 with open(src, 'rb') as fsrc: 2294 with open(dst, 'rb') as fdst: 2295 self.assertEqual(fsrc.read(), fdst.read()) 2296 2297 def test_content(self): 2298 with self.get_files() as (src, dst): 2299 shutil.copyfileobj(src, dst) 2300 self.assert_files_eq(TESTFN, TESTFN2) 2301 2302 def test_file_not_closed(self): 2303 with self.get_files() as (src, dst): 2304 shutil.copyfileobj(src, dst) 2305 assert not src.closed 2306 assert not dst.closed 2307 2308 def test_file_offset(self): 2309 with self.get_files() as (src, dst): 2310 shutil.copyfileobj(src, dst) 2311 self.assertEqual(src.tell(), self.FILESIZE) 2312 self.assertEqual(dst.tell(), self.FILESIZE) 2313 2314 @unittest.skipIf(os.name != 'nt', "Windows only") 2315 def test_win_impl(self): 2316 # Make sure alternate Windows implementation is called. 2317 with unittest.mock.patch("shutil._copyfileobj_readinto") as m: 2318 shutil.copyfile(TESTFN, TESTFN2) 2319 assert m.called 2320 2321 # File size is 2 MiB but max buf size should be 1 MiB. 2322 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024) 2323 2324 # If file size < 1 MiB memoryview() length must be equal to 2325 # the actual file size. 2326 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f: 2327 f.write(b'foo') 2328 fname = f.name 2329 self.addCleanup(os_helper.unlink, fname) 2330 with unittest.mock.patch("shutil._copyfileobj_readinto") as m: 2331 shutil.copyfile(fname, TESTFN2) 2332 self.assertEqual(m.call_args[0][2], 3) 2333 2334 # Empty files should not rely on readinto() variant. 2335 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f: 2336 pass 2337 fname = f.name 2338 self.addCleanup(os_helper.unlink, fname) 2339 with unittest.mock.patch("shutil._copyfileobj_readinto") as m: 2340 shutil.copyfile(fname, TESTFN2) 2341 assert not m.called 2342 self.assert_files_eq(fname, TESTFN2) 2343 2344 2345class _ZeroCopyFileTest(object): 2346 """Tests common to all zero-copy APIs.""" 2347 FILESIZE = (10 * 1024 * 1024) # 10 MiB 2348 FILEDATA = b"" 2349 PATCHPOINT = "" 2350 2351 @classmethod 2352 def setUpClass(cls): 2353 write_test_file(TESTFN, cls.FILESIZE) 2354 with open(TESTFN, 'rb') as f: 2355 cls.FILEDATA = f.read() 2356 assert len(cls.FILEDATA) == cls.FILESIZE 2357 2358 @classmethod 2359 def tearDownClass(cls): 2360 os_helper.unlink(TESTFN) 2361 2362 def tearDown(self): 2363 os_helper.unlink(TESTFN2) 2364 2365 @contextlib.contextmanager 2366 def get_files(self): 2367 with open(TESTFN, "rb") as src: 2368 with open(TESTFN2, "wb") as dst: 2369 yield (src, dst) 2370 2371 def zerocopy_fun(self, *args, **kwargs): 2372 raise NotImplementedError("must be implemented in subclass") 2373 2374 def reset(self): 2375 self.tearDown() 2376 self.tearDownClass() 2377 self.setUpClass() 2378 self.setUp() 2379 2380 # --- 2381 2382 def test_regular_copy(self): 2383 with self.get_files() as (src, dst): 2384 self.zerocopy_fun(src, dst) 2385 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2386 # Make sure the fallback function is not called. 2387 with self.get_files() as (src, dst): 2388 with unittest.mock.patch('shutil.copyfileobj') as m: 2389 shutil.copyfile(TESTFN, TESTFN2) 2390 assert not m.called 2391 2392 def test_same_file(self): 2393 self.addCleanup(self.reset) 2394 with self.get_files() as (src, dst): 2395 with self.assertRaises(Exception): 2396 self.zerocopy_fun(src, src) 2397 # Make sure src file is not corrupted. 2398 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA) 2399 2400 def test_non_existent_src(self): 2401 name = tempfile.mktemp(dir=os.getcwd()) 2402 with self.assertRaises(FileNotFoundError) as cm: 2403 shutil.copyfile(name, "new") 2404 self.assertEqual(cm.exception.filename, name) 2405 2406 def test_empty_file(self): 2407 srcname = TESTFN + 'src' 2408 dstname = TESTFN + 'dst' 2409 self.addCleanup(lambda: os_helper.unlink(srcname)) 2410 self.addCleanup(lambda: os_helper.unlink(dstname)) 2411 with open(srcname, "wb"): 2412 pass 2413 2414 with open(srcname, "rb") as src: 2415 with open(dstname, "wb") as dst: 2416 self.zerocopy_fun(src, dst) 2417 2418 self.assertEqual(read_file(dstname, binary=True), b"") 2419 2420 def test_unhandled_exception(self): 2421 with unittest.mock.patch(self.PATCHPOINT, 2422 side_effect=ZeroDivisionError): 2423 self.assertRaises(ZeroDivisionError, 2424 shutil.copyfile, TESTFN, TESTFN2) 2425 2426 def test_exception_on_first_call(self): 2427 # Emulate a case where the first call to the zero-copy 2428 # function raises an exception in which case the function is 2429 # supposed to give up immediately. 2430 with unittest.mock.patch(self.PATCHPOINT, 2431 side_effect=OSError(errno.EINVAL, "yo")): 2432 with self.get_files() as (src, dst): 2433 with self.assertRaises(_GiveupOnFastCopy): 2434 self.zerocopy_fun(src, dst) 2435 2436 def test_filesystem_full(self): 2437 # Emulate a case where filesystem is full and sendfile() fails 2438 # on first call. 2439 with unittest.mock.patch(self.PATCHPOINT, 2440 side_effect=OSError(errno.ENOSPC, "yo")): 2441 with self.get_files() as (src, dst): 2442 self.assertRaises(OSError, self.zerocopy_fun, src, dst) 2443 2444 2445@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported') 2446class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase): 2447 PATCHPOINT = "os.sendfile" 2448 2449 def zerocopy_fun(self, fsrc, fdst): 2450 return shutil._fastcopy_sendfile(fsrc, fdst) 2451 2452 def test_non_regular_file_src(self): 2453 with io.BytesIO(self.FILEDATA) as src: 2454 with open(TESTFN2, "wb") as dst: 2455 with self.assertRaises(_GiveupOnFastCopy): 2456 self.zerocopy_fun(src, dst) 2457 shutil.copyfileobj(src, dst) 2458 2459 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2460 2461 def test_non_regular_file_dst(self): 2462 with open(TESTFN, "rb") as src: 2463 with io.BytesIO() as dst: 2464 with self.assertRaises(_GiveupOnFastCopy): 2465 self.zerocopy_fun(src, dst) 2466 shutil.copyfileobj(src, dst) 2467 dst.seek(0) 2468 self.assertEqual(dst.read(), self.FILEDATA) 2469 2470 def test_exception_on_second_call(self): 2471 def sendfile(*args, **kwargs): 2472 if not flag: 2473 flag.append(None) 2474 return orig_sendfile(*args, **kwargs) 2475 else: 2476 raise OSError(errno.EBADF, "yo") 2477 2478 flag = [] 2479 orig_sendfile = os.sendfile 2480 with unittest.mock.patch('os.sendfile', create=True, 2481 side_effect=sendfile): 2482 with self.get_files() as (src, dst): 2483 with self.assertRaises(OSError) as cm: 2484 shutil._fastcopy_sendfile(src, dst) 2485 assert flag 2486 self.assertEqual(cm.exception.errno, errno.EBADF) 2487 2488 def test_cant_get_size(self): 2489 # Emulate a case where src file size cannot be determined. 2490 # Internally bufsize will be set to a small value and 2491 # sendfile() will be called repeatedly. 2492 with unittest.mock.patch('os.fstat', side_effect=OSError) as m: 2493 with self.get_files() as (src, dst): 2494 shutil._fastcopy_sendfile(src, dst) 2495 assert m.called 2496 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2497 2498 def test_small_chunks(self): 2499 # Force internal file size detection to be smaller than the 2500 # actual file size. We want to force sendfile() to be called 2501 # multiple times, also in order to emulate a src fd which gets 2502 # bigger while it is being copied. 2503 mock = unittest.mock.Mock() 2504 mock.st_size = 65536 + 1 2505 with unittest.mock.patch('os.fstat', return_value=mock) as m: 2506 with self.get_files() as (src, dst): 2507 shutil._fastcopy_sendfile(src, dst) 2508 assert m.called 2509 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2510 2511 def test_big_chunk(self): 2512 # Force internal file size detection to be +100MB bigger than 2513 # the actual file size. Make sure sendfile() does not rely on 2514 # file size value except for (maybe) a better throughput / 2515 # performance. 2516 mock = unittest.mock.Mock() 2517 mock.st_size = self.FILESIZE + (100 * 1024 * 1024) 2518 with unittest.mock.patch('os.fstat', return_value=mock) as m: 2519 with self.get_files() as (src, dst): 2520 shutil._fastcopy_sendfile(src, dst) 2521 assert m.called 2522 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2523 2524 def test_blocksize_arg(self): 2525 with unittest.mock.patch('os.sendfile', 2526 side_effect=ZeroDivisionError) as m: 2527 self.assertRaises(ZeroDivisionError, 2528 shutil.copyfile, TESTFN, TESTFN2) 2529 blocksize = m.call_args[0][3] 2530 # Make sure file size and the block size arg passed to 2531 # sendfile() are the same. 2532 self.assertEqual(blocksize, os.path.getsize(TESTFN)) 2533 # ...unless we're dealing with a small file. 2534 os_helper.unlink(TESTFN2) 2535 write_file(TESTFN2, b"hello", binary=True) 2536 self.addCleanup(os_helper.unlink, TESTFN2 + '3') 2537 self.assertRaises(ZeroDivisionError, 2538 shutil.copyfile, TESTFN2, TESTFN2 + '3') 2539 blocksize = m.call_args[0][3] 2540 self.assertEqual(blocksize, 2 ** 23) 2541 2542 def test_file2file_not_supported(self): 2543 # Emulate a case where sendfile() only support file->socket 2544 # fds. In such a case copyfile() is supposed to skip the 2545 # fast-copy attempt from then on. 2546 assert shutil._USE_CP_SENDFILE 2547 try: 2548 with unittest.mock.patch( 2549 self.PATCHPOINT, 2550 side_effect=OSError(errno.ENOTSOCK, "yo")) as m: 2551 with self.get_files() as (src, dst): 2552 with self.assertRaises(_GiveupOnFastCopy): 2553 shutil._fastcopy_sendfile(src, dst) 2554 assert m.called 2555 assert not shutil._USE_CP_SENDFILE 2556 2557 with unittest.mock.patch(self.PATCHPOINT) as m: 2558 shutil.copyfile(TESTFN, TESTFN2) 2559 assert not m.called 2560 finally: 2561 shutil._USE_CP_SENDFILE = True 2562 2563 2564@unittest.skipIf(not MACOS, 'macOS only') 2565class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase): 2566 PATCHPOINT = "posix._fcopyfile" 2567 2568 def zerocopy_fun(self, src, dst): 2569 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA) 2570 2571 2572class TestGetTerminalSize(unittest.TestCase): 2573 def test_does_not_crash(self): 2574 """Check if get_terminal_size() returns a meaningful value. 2575 2576 There's no easy portable way to actually check the size of the 2577 terminal, so let's check if it returns something sensible instead. 2578 """ 2579 size = shutil.get_terminal_size() 2580 self.assertGreaterEqual(size.columns, 0) 2581 self.assertGreaterEqual(size.lines, 0) 2582 2583 def test_os_environ_first(self): 2584 "Check if environment variables have precedence" 2585 2586 with os_helper.EnvironmentVarGuard() as env: 2587 env['COLUMNS'] = '777' 2588 del env['LINES'] 2589 size = shutil.get_terminal_size() 2590 self.assertEqual(size.columns, 777) 2591 2592 with os_helper.EnvironmentVarGuard() as env: 2593 del env['COLUMNS'] 2594 env['LINES'] = '888' 2595 size = shutil.get_terminal_size() 2596 self.assertEqual(size.lines, 888) 2597 2598 def test_bad_environ(self): 2599 with os_helper.EnvironmentVarGuard() as env: 2600 env['COLUMNS'] = 'xxx' 2601 env['LINES'] = 'yyy' 2602 size = shutil.get_terminal_size() 2603 self.assertGreaterEqual(size.columns, 0) 2604 self.assertGreaterEqual(size.lines, 0) 2605 2606 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty") 2607 @unittest.skipUnless(hasattr(os, 'get_terminal_size'), 2608 'need os.get_terminal_size()') 2609 def test_stty_match(self): 2610 """Check if stty returns the same results ignoring env 2611 2612 This test will fail if stdin and stdout are connected to 2613 different terminals with different sizes. Nevertheless, such 2614 situations should be pretty rare. 2615 """ 2616 try: 2617 size = subprocess.check_output(['stty', 'size']).decode().split() 2618 except (FileNotFoundError, PermissionError, 2619 subprocess.CalledProcessError): 2620 self.skipTest("stty invocation failed") 2621 expected = (int(size[1]), int(size[0])) # reversed order 2622 2623 with os_helper.EnvironmentVarGuard() as env: 2624 del env['LINES'] 2625 del env['COLUMNS'] 2626 actual = shutil.get_terminal_size() 2627 2628 self.assertEqual(expected, actual) 2629 2630 def test_fallback(self): 2631 with os_helper.EnvironmentVarGuard() as env: 2632 del env['LINES'] 2633 del env['COLUMNS'] 2634 2635 # sys.__stdout__ has no fileno() 2636 with support.swap_attr(sys, '__stdout__', None): 2637 size = shutil.get_terminal_size(fallback=(10, 20)) 2638 self.assertEqual(size.columns, 10) 2639 self.assertEqual(size.lines, 20) 2640 2641 # sys.__stdout__ is not a terminal on Unix 2642 # or fileno() not in (0, 1, 2) on Windows 2643 with open(os.devnull, 'w', encoding='utf-8') as f, \ 2644 support.swap_attr(sys, '__stdout__', f): 2645 size = shutil.get_terminal_size(fallback=(30, 40)) 2646 self.assertEqual(size.columns, 30) 2647 self.assertEqual(size.lines, 40) 2648 2649 2650class PublicAPITests(unittest.TestCase): 2651 """Ensures that the correct values are exposed in the public API.""" 2652 2653 def test_module_all_attribute(self): 2654 self.assertTrue(hasattr(shutil, '__all__')) 2655 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat', 2656 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error', 2657 'SpecialFileError', 'ExecError', 'make_archive', 2658 'get_archive_formats', 'register_archive_format', 2659 'unregister_archive_format', 'get_unpack_formats', 2660 'register_unpack_format', 'unregister_unpack_format', 2661 'unpack_archive', 'ignore_patterns', 'chown', 'which', 2662 'get_terminal_size', 'SameFileError'] 2663 if hasattr(os, 'statvfs') or os.name == 'nt': 2664 target_api.append('disk_usage') 2665 self.assertEqual(set(shutil.__all__), set(target_api)) 2666 2667 2668if __name__ == '__main__': 2669 unittest.main() 2670