1# Copyright (C) 2003 Python Software Foundation 2 3import unittest 4import shutil 5import tempfile 6import sys 7import stat 8import os 9import os.path 10import errno 11import subprocess 12from distutils.spawn import find_executable 13from shutil import (make_archive, 14 register_archive_format, unregister_archive_format, 15 get_archive_formats) 16import tarfile 17import warnings 18 19from test import test_support as support 20from test.test_support import TESTFN, check_warnings, captured_stdout 21 22TESTFN2 = TESTFN + "2" 23 24try: 25 import grp 26 import pwd 27 UID_GID_SUPPORT = True 28except ImportError: 29 UID_GID_SUPPORT = False 30 31try: 32 import zlib 33except ImportError: 34 zlib = None 35 36try: 37 import zipfile 38 import zlib 39 ZIP_SUPPORT = True 40except ImportError: 41 ZIP_SUPPORT = find_executable('zip') 42 43class TestShutil(unittest.TestCase): 44 45 def setUp(self): 46 super(TestShutil, self).setUp() 47 self.tempdirs = [] 48 49 def tearDown(self): 50 super(TestShutil, self).tearDown() 51 while self.tempdirs: 52 d = self.tempdirs.pop() 53 shutil.rmtree(d, os.name in ('nt', 'cygwin')) 54 55 def write_file(self, path, content='xxx'): 56 """Writes a file in the given path. 57 58 59 path can be a string or a sequence. 60 """ 61 if isinstance(path, (list, tuple)): 62 path = os.path.join(*path) 63 f = open(path, 'w') 64 try: 65 f.write(content) 66 finally: 67 f.close() 68 69 def mkdtemp(self): 70 """Create a temporary directory that will be cleaned up. 71 72 Returns the path of the directory. 73 """ 74 d = tempfile.mkdtemp() 75 self.tempdirs.append(d) 76 return d 77 def test_rmtree_errors(self): 78 # filename is guaranteed not to exist 79 filename = tempfile.mktemp() 80 self.assertRaises(OSError, shutil.rmtree, filename) 81 82 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()') 83 @unittest.skipIf(sys.platform[:6] == 'cygwin', 84 "This test can't be run on Cygwin (issue #1071513).") 85 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 86 "This test can't be run reliably as root (issue #1076467).") 87 def test_on_error(self): 88 self.errorState = 0 89 os.mkdir(TESTFN) 90 self.childpath = os.path.join(TESTFN, 'a') 91 f = open(self.childpath, 'w') 92 f.close() 93 old_dir_mode = os.stat(TESTFN).st_mode 94 old_child_mode = os.stat(self.childpath).st_mode 95 # Make unwritable. 96 os.chmod(self.childpath, stat.S_IREAD) 97 os.chmod(TESTFN, stat.S_IREAD) 98 99 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror) 100 # Test whether onerror has actually been called. 101 self.assertEqual(self.errorState, 2, 102 "Expected call to onerror function did not happen.") 103 104 # Make writable again. 105 os.chmod(TESTFN, old_dir_mode) 106 os.chmod(self.childpath, old_child_mode) 107 108 # Clean up. 109 shutil.rmtree(TESTFN) 110 111 def check_args_to_onerror(self, func, arg, exc): 112 # test_rmtree_errors deliberately runs rmtree 113 # on a directory that is chmod 400, which will fail. 114 # This function is run when shutil.rmtree fails. 115 # 99.9% of the time it initially fails to remove 116 # a file in the directory, so the first time through 117 # func is os.remove. 118 # However, some Linux machines running ZFS on 119 # FUSE experienced a failure earlier in the process 120 # at os.listdir. The first failure may legally 121 # be either. 122 if self.errorState == 0: 123 if func is os.remove: 124 self.assertEqual(arg, self.childpath) 125 else: 126 self.assertIs(func, os.listdir, 127 "func must be either os.remove or os.listdir") 128 self.assertEqual(arg, TESTFN) 129 self.assertTrue(issubclass(exc[0], OSError)) 130 self.errorState = 1 131 else: 132 self.assertEqual(func, os.rmdir) 133 self.assertEqual(arg, TESTFN) 134 self.assertTrue(issubclass(exc[0], OSError)) 135 self.errorState = 2 136 137 def test_rmtree_dont_delete_file(self): 138 # When called on a file instead of a directory, don't delete it. 139 handle, path = tempfile.mkstemp() 140 os.fdopen(handle).close() 141 self.assertRaises(OSError, shutil.rmtree, path) 142 os.remove(path) 143 144 def test_copytree_simple(self): 145 def write_data(path, data): 146 f = open(path, "w") 147 f.write(data) 148 f.close() 149 150 def read_data(path): 151 f = open(path) 152 data = f.read() 153 f.close() 154 return data 155 156 src_dir = tempfile.mkdtemp() 157 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination') 158 159 write_data(os.path.join(src_dir, 'test.txt'), '123') 160 161 os.mkdir(os.path.join(src_dir, 'test_dir')) 162 write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456') 163 164 try: 165 shutil.copytree(src_dir, dst_dir) 166 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt'))) 167 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir'))) 168 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir', 169 'test.txt'))) 170 actual = read_data(os.path.join(dst_dir, 'test.txt')) 171 self.assertEqual(actual, '123') 172 actual = read_data(os.path.join(dst_dir, 'test_dir', 'test.txt')) 173 self.assertEqual(actual, '456') 174 finally: 175 for path in ( 176 os.path.join(src_dir, 'test.txt'), 177 os.path.join(dst_dir, 'test.txt'), 178 os.path.join(src_dir, 'test_dir', 'test.txt'), 179 os.path.join(dst_dir, 'test_dir', 'test.txt'), 180 ): 181 if os.path.exists(path): 182 os.remove(path) 183 for path in (src_dir, 184 os.path.dirname(dst_dir) 185 ): 186 if os.path.exists(path): 187 shutil.rmtree(path) 188 189 def test_copytree_with_exclude(self): 190 191 def write_data(path, data): 192 f = open(path, "w") 193 f.write(data) 194 f.close() 195 196 def read_data(path): 197 f = open(path) 198 data = f.read() 199 f.close() 200 return data 201 202 # creating data 203 join = os.path.join 204 exists = os.path.exists 205 src_dir = tempfile.mkdtemp() 206 try: 207 dst_dir = join(tempfile.mkdtemp(), 'destination') 208 write_data(join(src_dir, 'test.txt'), '123') 209 write_data(join(src_dir, 'test.tmp'), '123') 210 os.mkdir(join(src_dir, 'test_dir')) 211 write_data(join(src_dir, 'test_dir', 'test.txt'), '456') 212 os.mkdir(join(src_dir, 'test_dir2')) 213 write_data(join(src_dir, 'test_dir2', 'test.txt'), '456') 214 os.mkdir(join(src_dir, 'test_dir2', 'subdir')) 215 os.mkdir(join(src_dir, 'test_dir2', 'subdir2')) 216 write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'), '456') 217 write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'), '456') 218 219 220 # testing glob-like patterns 221 try: 222 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2') 223 shutil.copytree(src_dir, dst_dir, ignore=patterns) 224 # checking the result: some elements should not be copied 225 self.assertTrue(exists(join(dst_dir, 'test.txt'))) 226 self.assertTrue(not exists(join(dst_dir, 'test.tmp'))) 227 self.assertTrue(not exists(join(dst_dir, 'test_dir2'))) 228 finally: 229 if os.path.exists(dst_dir): 230 shutil.rmtree(dst_dir) 231 try: 232 patterns = shutil.ignore_patterns('*.tmp', 'subdir*') 233 shutil.copytree(src_dir, dst_dir, ignore=patterns) 234 # checking the result: some elements should not be copied 235 self.assertTrue(not exists(join(dst_dir, 'test.tmp'))) 236 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2'))) 237 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir'))) 238 finally: 239 if os.path.exists(dst_dir): 240 shutil.rmtree(dst_dir) 241 242 # testing callable-style 243 try: 244 def _filter(src, names): 245 res = [] 246 for name in names: 247 path = os.path.join(src, name) 248 249 if (os.path.isdir(path) and 250 path.split()[-1] == 'subdir'): 251 res.append(name) 252 elif os.path.splitext(path)[-1] in ('.py'): 253 res.append(name) 254 return res 255 256 shutil.copytree(src_dir, dst_dir, ignore=_filter) 257 258 # checking the result: some elements should not be copied 259 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2', 260 'test.py'))) 261 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir'))) 262 263 finally: 264 if os.path.exists(dst_dir): 265 shutil.rmtree(dst_dir) 266 finally: 267 shutil.rmtree(src_dir) 268 shutil.rmtree(os.path.dirname(dst_dir)) 269 270 if hasattr(os, "symlink"): 271 def test_dont_copy_file_onto_link_to_itself(self): 272 # bug 851123. 273 os.mkdir(TESTFN) 274 src = os.path.join(TESTFN, 'cheese') 275 dst = os.path.join(TESTFN, 'shop') 276 try: 277 f = open(src, 'w') 278 f.write('cheddar') 279 f.close() 280 281 os.link(src, dst) 282 self.assertRaises(shutil.Error, shutil.copyfile, src, dst) 283 with open(src, 'r') as f: 284 self.assertEqual(f.read(), 'cheddar') 285 os.remove(dst) 286 287 # Using `src` here would mean we end up with a symlink pointing 288 # to TESTFN/TESTFN/cheese, while it should point at 289 # TESTFN/cheese. 290 os.symlink('cheese', dst) 291 self.assertRaises(shutil.Error, shutil.copyfile, src, dst) 292 with open(src, 'r') as f: 293 self.assertEqual(f.read(), 'cheddar') 294 os.remove(dst) 295 finally: 296 try: 297 shutil.rmtree(TESTFN) 298 except OSError: 299 pass 300 301 def test_rmtree_on_symlink(self): 302 # bug 1669. 303 os.mkdir(TESTFN) 304 try: 305 src = os.path.join(TESTFN, 'cheese') 306 dst = os.path.join(TESTFN, 'shop') 307 os.mkdir(src) 308 os.symlink(src, dst) 309 self.assertRaises(OSError, shutil.rmtree, dst) 310 finally: 311 shutil.rmtree(TESTFN, ignore_errors=True) 312 313 # Issue #3002: copyfile and copytree block indefinitely on named pipes 314 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') 315 def test_copyfile_named_pipe(self): 316 os.mkfifo(TESTFN) 317 try: 318 self.assertRaises(shutil.SpecialFileError, 319 shutil.copyfile, TESTFN, TESTFN2) 320 self.assertRaises(shutil.SpecialFileError, 321 shutil.copyfile, __file__, TESTFN) 322 finally: 323 os.remove(TESTFN) 324 325 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') 326 def test_copytree_named_pipe(self): 327 os.mkdir(TESTFN) 328 try: 329 subdir = os.path.join(TESTFN, "subdir") 330 os.mkdir(subdir) 331 pipe = os.path.join(subdir, "mypipe") 332 os.mkfifo(pipe) 333 try: 334 shutil.copytree(TESTFN, TESTFN2) 335 except shutil.Error as e: 336 errors = e.args[0] 337 self.assertEqual(len(errors), 1) 338 src, dst, error_msg = errors[0] 339 self.assertEqual("`%s` is a named pipe" % pipe, error_msg) 340 else: 341 self.fail("shutil.Error should have been raised") 342 finally: 343 shutil.rmtree(TESTFN, ignore_errors=True) 344 shutil.rmtree(TESTFN2, ignore_errors=True) 345 346 @unittest.skipUnless(hasattr(os, 'chflags') and 347 hasattr(errno, 'EOPNOTSUPP') and 348 hasattr(errno, 'ENOTSUP'), 349 "requires os.chflags, EOPNOTSUPP & ENOTSUP") 350 def test_copystat_handles_harmless_chflags_errors(self): 351 tmpdir = self.mkdtemp() 352 file1 = os.path.join(tmpdir, 'file1') 353 file2 = os.path.join(tmpdir, 'file2') 354 self.write_file(file1, 'xxx') 355 self.write_file(file2, 'xxx') 356 357 def make_chflags_raiser(err): 358 ex = OSError() 359 360 def _chflags_raiser(path, flags): 361 ex.errno = err 362 raise ex 363 return _chflags_raiser 364 old_chflags = os.chflags 365 try: 366 for err in errno.EOPNOTSUPP, errno.ENOTSUP: 367 os.chflags = make_chflags_raiser(err) 368 shutil.copystat(file1, file2) 369 # assert others errors break it 370 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP) 371 self.assertRaises(OSError, shutil.copystat, file1, file2) 372 finally: 373 os.chflags = old_chflags 374 375 @unittest.skipUnless(zlib, "requires zlib") 376 def test_make_tarball(self): 377 # creating something to tar 378 root_dir, base_dir = self._create_files('') 379 380 tmpdir2 = self.mkdtemp() 381 # force shutil to create the directory 382 os.rmdir(tmpdir2) 383 # working with relative paths 384 work_dir = os.path.dirname(tmpdir2) 385 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive') 386 387 with support.change_cwd(work_dir): 388 base_name = os.path.abspath(rel_base_name) 389 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.') 390 391 # check if the compressed tarball was created 392 self.assertEqual(tarball, base_name + '.tar.gz') 393 self.assertTrue(os.path.isfile(tarball)) 394 self.assertTrue(tarfile.is_tarfile(tarball)) 395 with tarfile.open(tarball, 'r:gz') as tf: 396 self.assertEqual(sorted(tf.getnames()), 397 ['.', './file1', './file2', 398 './sub', './sub/file3', './sub2']) 399 400 # trying an uncompressed one 401 with support.change_cwd(work_dir): 402 tarball = make_archive(rel_base_name, 'tar', root_dir, '.') 403 self.assertEqual(tarball, base_name + '.tar') 404 self.assertTrue(os.path.isfile(tarball)) 405 self.assertTrue(tarfile.is_tarfile(tarball)) 406 with tarfile.open(tarball, 'r') as tf: 407 self.assertEqual(sorted(tf.getnames()), 408 ['.', './file1', './file2', 409 './sub', './sub/file3', './sub2']) 410 411 def _tarinfo(self, path): 412 with tarfile.open(path) as tar: 413 names = tar.getnames() 414 names.sort() 415 return tuple(names) 416 417 def _create_files(self, base_dir='dist'): 418 # creating something to tar 419 root_dir = self.mkdtemp() 420 dist = os.path.join(root_dir, base_dir) 421 if not os.path.isdir(dist): 422 os.makedirs(dist) 423 self.write_file((dist, 'file1'), 'xxx') 424 self.write_file((dist, 'file2'), 'xxx') 425 os.mkdir(os.path.join(dist, 'sub')) 426 self.write_file((dist, 'sub', 'file3'), 'xxx') 427 os.mkdir(os.path.join(dist, 'sub2')) 428 if base_dir: 429 self.write_file((root_dir, 'outer'), 'xxx') 430 return root_dir, base_dir 431 432 @unittest.skipUnless(zlib, "Requires zlib") 433 @unittest.skipUnless(find_executable('tar'), 434 'Need the tar command to run') 435 def test_tarfile_vs_tar(self): 436 root_dir, base_dir = self._create_files() 437 base_name = os.path.join(self.mkdtemp(), 'archive') 438 tarball = make_archive(base_name, 'gztar', root_dir, base_dir) 439 440 # check if the compressed tarball was created 441 self.assertEqual(tarball, base_name + '.tar.gz') 442 self.assertTrue(os.path.isfile(tarball)) 443 444 # now create another tarball using `tar` 445 tarball2 = os.path.join(root_dir, 'archive2.tar') 446 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir] 447 subprocess.check_call(tar_cmd, cwd=root_dir) 448 449 self.assertTrue(os.path.isfile(tarball2)) 450 # let's compare both tarballs 451 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2)) 452 453 # trying an uncompressed one 454 tarball = make_archive(base_name, 'tar', root_dir, base_dir) 455 self.assertEqual(tarball, base_name + '.tar') 456 self.assertTrue(os.path.isfile(tarball)) 457 458 # now for a dry_run 459 tarball = make_archive(base_name, 'tar', root_dir, base_dir, 460 dry_run=True) 461 self.assertEqual(tarball, base_name + '.tar') 462 self.assertTrue(os.path.isfile(tarball)) 463 464 @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run') 465 def test_make_zipfile(self): 466 # creating something to zip 467 root_dir, base_dir = self._create_files() 468 469 tmpdir2 = self.mkdtemp() 470 # force shutil to create the directory 471 os.rmdir(tmpdir2) 472 # working with relative paths 473 work_dir = os.path.dirname(tmpdir2) 474 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive') 475 476 with support.change_cwd(work_dir): 477 base_name = os.path.abspath(rel_base_name) 478 res = make_archive(rel_base_name, 'zip', root_dir) 479 480 self.assertEqual(res, base_name + '.zip') 481 self.assertTrue(os.path.isfile(res)) 482 self.assertTrue(zipfile.is_zipfile(res)) 483 with zipfile.ZipFile(res) as zf: 484 self.assertEqual(sorted(zf.namelist()), 485 ['dist/', 'dist/file1', 'dist/file2', 486 'dist/sub/', 'dist/sub/file3', 'dist/sub2/', 487 'outer']) 488 support.unlink(res) 489 490 with support.change_cwd(work_dir): 491 base_name = os.path.abspath(rel_base_name) 492 res = make_archive(rel_base_name, 'zip', root_dir, base_dir) 493 494 self.assertEqual(res, base_name + '.zip') 495 self.assertTrue(os.path.isfile(res)) 496 self.assertTrue(zipfile.is_zipfile(res)) 497 with zipfile.ZipFile(res) as zf: 498 self.assertEqual(sorted(zf.namelist()), 499 ['dist/', 'dist/file1', 'dist/file2', 500 'dist/sub/', 'dist/sub/file3', 'dist/sub2/']) 501 502 @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run') 503 @unittest.skipUnless(find_executable('zip'), 504 'Need the zip command to run') 505 def test_zipfile_vs_zip(self): 506 root_dir, base_dir = self._create_files() 507 base_name = os.path.join(self.mkdtemp(), 'archive') 508 archive = make_archive(base_name, 'zip', root_dir, base_dir) 509 510 # check if ZIP file was created 511 self.assertEqual(archive, base_name + '.zip') 512 self.assertTrue(os.path.isfile(archive)) 513 514 # now create another ZIP file using `zip` 515 archive2 = os.path.join(root_dir, 'archive2.zip') 516 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir] 517 subprocess.check_call(zip_cmd, cwd=root_dir) 518 519 self.assertTrue(os.path.isfile(archive2)) 520 # let's compare both ZIP files 521 with zipfile.ZipFile(archive) as zf: 522 names = zf.namelist() 523 with zipfile.ZipFile(archive2) as zf: 524 names2 = zf.namelist() 525 self.assertEqual(sorted(names), sorted(names2)) 526 527 @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run') 528 @unittest.skipUnless(find_executable('unzip'), 529 'Need the unzip command to run') 530 def test_unzip_zipfile(self): 531 root_dir, base_dir = self._create_files() 532 base_name = os.path.join(self.mkdtemp(), 'archive') 533 archive = make_archive(base_name, 'zip', root_dir, base_dir) 534 535 # check if ZIP file was created 536 self.assertEqual(archive, base_name + '.zip') 537 self.assertTrue(os.path.isfile(archive)) 538 539 # now check the ZIP file using `unzip -t` 540 zip_cmd = ['unzip', '-t', archive] 541 with support.change_cwd(root_dir): 542 try: 543 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT) 544 except subprocess.CalledProcessError as exc: 545 details = exc.output 546 if 'unrecognized option: t' in details: 547 self.skipTest("unzip doesn't support -t") 548 msg = "{}\n\n**Unzip Output**\n{}" 549 self.fail(msg.format(exc, details)) 550 551 def test_make_archive(self): 552 tmpdir = self.mkdtemp() 553 base_name = os.path.join(tmpdir, 'archive') 554 self.assertRaises(ValueError, make_archive, base_name, 'xxx') 555 556 @unittest.skipUnless(zlib, "Requires zlib") 557 def test_make_archive_owner_group(self): 558 # testing make_archive with owner and group, with various combinations 559 # this works even if there's not gid/uid support 560 if UID_GID_SUPPORT: 561 group = grp.getgrgid(0)[0] 562 owner = pwd.getpwuid(0)[0] 563 else: 564 group = owner = 'root' 565 566 root_dir, base_dir = self._create_files() 567 base_name = os.path.join(self.mkdtemp(), 'archive') 568 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner, 569 group=group) 570 self.assertTrue(os.path.isfile(res)) 571 572 res = make_archive(base_name, 'zip', root_dir, base_dir) 573 self.assertTrue(os.path.isfile(res)) 574 575 res = make_archive(base_name, 'tar', root_dir, base_dir, 576 owner=owner, group=group) 577 self.assertTrue(os.path.isfile(res)) 578 579 res = make_archive(base_name, 'tar', root_dir, base_dir, 580 owner='kjhkjhkjg', group='oihohoh') 581 self.assertTrue(os.path.isfile(res)) 582 583 @unittest.skipUnless(zlib, "Requires zlib") 584 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") 585 def test_tarfile_root_owner(self): 586 root_dir, base_dir = self._create_files() 587 base_name = os.path.join(self.mkdtemp(), 'archive') 588 group = grp.getgrgid(0)[0] 589 owner = pwd.getpwuid(0)[0] 590 with support.change_cwd(root_dir): 591 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist', 592 owner=owner, group=group) 593 594 # check if the compressed tarball was created 595 self.assertTrue(os.path.isfile(archive_name)) 596 597 # now checks the rights 598 archive = tarfile.open(archive_name) 599 try: 600 for member in archive.getmembers(): 601 self.assertEqual(member.uid, 0) 602 self.assertEqual(member.gid, 0) 603 finally: 604 archive.close() 605 606 def test_make_archive_cwd(self): 607 current_dir = os.getcwd() 608 def _breaks(*args, **kw): 609 raise RuntimeError() 610 611 register_archive_format('xxx', _breaks, [], 'xxx file') 612 try: 613 try: 614 make_archive('xxx', 'xxx', root_dir=self.mkdtemp()) 615 except Exception: 616 pass 617 self.assertEqual(os.getcwd(), current_dir) 618 finally: 619 unregister_archive_format('xxx') 620 621 def test_make_tarfile_in_curdir(self): 622 # Issue #21280 623 root_dir = self.mkdtemp() 624 saved_dir = os.getcwd() 625 try: 626 os.chdir(root_dir) 627 self.assertEqual(make_archive('test', 'tar'), 'test.tar') 628 self.assertTrue(os.path.isfile('test.tar')) 629 finally: 630 os.chdir(saved_dir) 631 632 @unittest.skipUnless(zlib, "Requires zlib") 633 def test_make_zipfile_in_curdir(self): 634 # Issue #21280 635 root_dir = self.mkdtemp() 636 saved_dir = os.getcwd() 637 try: 638 os.chdir(root_dir) 639 self.assertEqual(make_archive('test', 'zip'), 'test.zip') 640 self.assertTrue(os.path.isfile('test.zip')) 641 finally: 642 os.chdir(saved_dir) 643 644 def test_register_archive_format(self): 645 646 self.assertRaises(TypeError, register_archive_format, 'xxx', 1) 647 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, 648 1) 649 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, 650 [(1, 2), (1, 2, 3)]) 651 652 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file') 653 formats = [name for name, params in get_archive_formats()] 654 self.assertIn('xxx', formats) 655 656 unregister_archive_format('xxx') 657 formats = [name for name, params in get_archive_formats()] 658 self.assertNotIn('xxx', formats) 659 660 661class TestMove(unittest.TestCase): 662 663 def setUp(self): 664 filename = "foo" 665 self.src_dir = tempfile.mkdtemp() 666 self.dst_dir = tempfile.mkdtemp() 667 self.src_file = os.path.join(self.src_dir, filename) 668 self.dst_file = os.path.join(self.dst_dir, filename) 669 # Try to create a dir in the current directory, hoping that it is 670 # not located on the same filesystem as the system tmp dir. 671 try: 672 self.dir_other_fs = tempfile.mkdtemp( 673 dir=os.path.dirname(__file__)) 674 self.file_other_fs = os.path.join(self.dir_other_fs, 675 filename) 676 except OSError: 677 self.dir_other_fs = None 678 with open(self.src_file, "wb") as f: 679 f.write("spam") 680 681 def tearDown(self): 682 for d in (self.src_dir, self.dst_dir, self.dir_other_fs): 683 try: 684 if d: 685 shutil.rmtree(d) 686 except: 687 pass 688 689 def _check_move_file(self, src, dst, real_dst): 690 with open(src, "rb") as f: 691 contents = f.read() 692 shutil.move(src, dst) 693 with open(real_dst, "rb") as f: 694 self.assertEqual(contents, f.read()) 695 self.assertFalse(os.path.exists(src)) 696 697 def _check_move_dir(self, src, dst, real_dst): 698 contents = sorted(os.listdir(src)) 699 shutil.move(src, dst) 700 self.assertEqual(contents, sorted(os.listdir(real_dst))) 701 self.assertFalse(os.path.exists(src)) 702 703 def test_move_file(self): 704 # Move a file to another location on the same filesystem. 705 self._check_move_file(self.src_file, self.dst_file, self.dst_file) 706 707 def test_move_file_to_dir(self): 708 # Move a file inside an existing dir on the same filesystem. 709 self._check_move_file(self.src_file, self.dst_dir, self.dst_file) 710 711 def test_move_file_other_fs(self): 712 # Move a file to an existing dir on another filesystem. 713 if not self.dir_other_fs: 714 self.skipTest('dir on other filesystem not available') 715 self._check_move_file(self.src_file, self.file_other_fs, 716 self.file_other_fs) 717 718 def test_move_file_to_dir_other_fs(self): 719 # Move a file to another location on another filesystem. 720 if not self.dir_other_fs: 721 self.skipTest('dir on other filesystem not available') 722 self._check_move_file(self.src_file, self.dir_other_fs, 723 self.file_other_fs) 724 725 def test_move_dir(self): 726 # Move a dir to another location on the same filesystem. 727 dst_dir = tempfile.mktemp() 728 try: 729 self._check_move_dir(self.src_dir, dst_dir, dst_dir) 730 finally: 731 try: 732 shutil.rmtree(dst_dir) 733 except: 734 pass 735 736 def test_move_dir_other_fs(self): 737 # Move a dir to another location on another filesystem. 738 if not self.dir_other_fs: 739 self.skipTest('dir on other filesystem not available') 740 dst_dir = tempfile.mktemp(dir=self.dir_other_fs) 741 try: 742 self._check_move_dir(self.src_dir, dst_dir, dst_dir) 743 finally: 744 try: 745 shutil.rmtree(dst_dir) 746 except: 747 pass 748 749 def test_move_dir_to_dir(self): 750 # Move a dir inside an existing dir on the same filesystem. 751 self._check_move_dir(self.src_dir, self.dst_dir, 752 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 753 754 def test_move_dir_to_dir_other_fs(self): 755 # Move a dir inside an existing dir on another filesystem. 756 if not self.dir_other_fs: 757 self.skipTest('dir on other filesystem not available') 758 self._check_move_dir(self.src_dir, self.dir_other_fs, 759 os.path.join(self.dir_other_fs, os.path.basename(self.src_dir))) 760 761 def test_move_dir_sep_to_dir(self): 762 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir, 763 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 764 765 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep') 766 def test_move_dir_altsep_to_dir(self): 767 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir, 768 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 769 770 def test_existing_file_inside_dest_dir(self): 771 # A file with the same name inside the destination dir already exists. 772 with open(self.dst_file, "wb"): 773 pass 774 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir) 775 776 def test_dont_move_dir_in_itself(self): 777 # Moving a dir inside itself raises an Error. 778 dst = os.path.join(self.src_dir, "bar") 779 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst) 780 781 def test_destinsrc_false_negative(self): 782 os.mkdir(TESTFN) 783 try: 784 for src, dst in [('srcdir', 'srcdir/dest')]: 785 src = os.path.join(TESTFN, src) 786 dst = os.path.join(TESTFN, dst) 787 self.assertTrue(shutil._destinsrc(src, dst), 788 msg='_destinsrc() wrongly concluded that ' 789 'dst (%s) is not in src (%s)' % (dst, src)) 790 finally: 791 shutil.rmtree(TESTFN, ignore_errors=True) 792 793 def test_destinsrc_false_positive(self): 794 os.mkdir(TESTFN) 795 try: 796 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]: 797 src = os.path.join(TESTFN, src) 798 dst = os.path.join(TESTFN, dst) 799 self.assertFalse(shutil._destinsrc(src, dst), 800 msg='_destinsrc() wrongly concluded that ' 801 'dst (%s) is in src (%s)' % (dst, src)) 802 finally: 803 shutil.rmtree(TESTFN, ignore_errors=True) 804 805 806class TestCopyFile(unittest.TestCase): 807 808 _delete = False 809 810 class Faux(object): 811 _entered = False 812 _exited_with = None 813 _raised = False 814 def __init__(self, raise_in_exit=False, suppress_at_exit=True): 815 self._raise_in_exit = raise_in_exit 816 self._suppress_at_exit = suppress_at_exit 817 def read(self, *args): 818 return '' 819 def __enter__(self): 820 self._entered = True 821 def __exit__(self, exc_type, exc_val, exc_tb): 822 self._exited_with = exc_type, exc_val, exc_tb 823 if self._raise_in_exit: 824 self._raised = True 825 raise IOError("Cannot close") 826 return self._suppress_at_exit 827 828 def tearDown(self): 829 if self._delete: 830 del shutil.open 831 832 def _set_shutil_open(self, func): 833 shutil.open = func 834 self._delete = True 835 836 def test_w_source_open_fails(self): 837 def _open(filename, mode='r'): 838 if filename == 'srcfile': 839 raise IOError('Cannot open "srcfile"') 840 assert 0 # shouldn't reach here. 841 842 self._set_shutil_open(_open) 843 844 self.assertRaises(IOError, shutil.copyfile, 'srcfile', 'destfile') 845 846 def test_w_dest_open_fails(self): 847 848 srcfile = self.Faux() 849 850 def _open(filename, mode='r'): 851 if filename == 'srcfile': 852 return srcfile 853 if filename == 'destfile': 854 raise IOError('Cannot open "destfile"') 855 assert 0 # shouldn't reach here. 856 857 self._set_shutil_open(_open) 858 859 shutil.copyfile('srcfile', 'destfile') 860 self.assertTrue(srcfile._entered) 861 self.assertTrue(srcfile._exited_with[0] is IOError) 862 self.assertEqual(srcfile._exited_with[1].args, 863 ('Cannot open "destfile"',)) 864 865 def test_w_dest_close_fails(self): 866 867 srcfile = self.Faux() 868 destfile = self.Faux(True) 869 870 def _open(filename, mode='r'): 871 if filename == 'srcfile': 872 return srcfile 873 if filename == 'destfile': 874 return destfile 875 assert 0 # shouldn't reach here. 876 877 self._set_shutil_open(_open) 878 879 shutil.copyfile('srcfile', 'destfile') 880 self.assertTrue(srcfile._entered) 881 self.assertTrue(destfile._entered) 882 self.assertTrue(destfile._raised) 883 self.assertTrue(srcfile._exited_with[0] is IOError) 884 self.assertEqual(srcfile._exited_with[1].args, 885 ('Cannot close',)) 886 887 def test_w_source_close_fails(self): 888 889 srcfile = self.Faux(True) 890 destfile = self.Faux() 891 892 def _open(filename, mode='r'): 893 if filename == 'srcfile': 894 return srcfile 895 if filename == 'destfile': 896 return destfile 897 assert 0 # shouldn't reach here. 898 899 self._set_shutil_open(_open) 900 901 self.assertRaises(IOError, 902 shutil.copyfile, 'srcfile', 'destfile') 903 self.assertTrue(srcfile._entered) 904 self.assertTrue(destfile._entered) 905 self.assertFalse(destfile._raised) 906 self.assertTrue(srcfile._exited_with[0] is None) 907 self.assertTrue(srcfile._raised) 908 909 def test_move_dir_caseinsensitive(self): 910 # Renames a folder to the same name 911 # but a different case. 912 913 self.src_dir = tempfile.mkdtemp() 914 dst_dir = os.path.join( 915 os.path.dirname(self.src_dir), 916 os.path.basename(self.src_dir).upper()) 917 self.assertNotEqual(self.src_dir, dst_dir) 918 919 try: 920 shutil.move(self.src_dir, dst_dir) 921 self.assertTrue(os.path.isdir(dst_dir)) 922 finally: 923 if os.path.exists(dst_dir): 924 os.rmdir(dst_dir) 925 926 927 928def test_main(): 929 support.run_unittest(TestShutil, TestMove, TestCopyFile) 930 931if __name__ == '__main__': 932 test_main() 933