1from test.support.import_helper import unload, CleanImport 2from test.support.warnings_helper import check_warnings 3import unittest 4import sys 5import importlib 6from importlib.util import spec_from_file_location 7import pkgutil 8import os 9import os.path 10import tempfile 11import shutil 12import zipfile 13 14# Note: pkgutil.walk_packages is currently tested in test_runpy. This is 15# a hack to get a major issue resolved for 3.3b2. Longer term, it should 16# be moved back here, perhaps by factoring out the helper code for 17# creating interesting package layouts to a separate module. 18# Issue #15348 declares this is indeed a dodgy hack ;) 19 20class PkgutilTests(unittest.TestCase): 21 22 def setUp(self): 23 self.dirname = tempfile.mkdtemp() 24 self.addCleanup(shutil.rmtree, self.dirname) 25 sys.path.insert(0, self.dirname) 26 27 def tearDown(self): 28 del sys.path[0] 29 30 def test_getdata_filesys(self): 31 pkg = 'test_getdata_filesys' 32 33 # Include a LF and a CRLF, to test that binary data is read back 34 RESOURCE_DATA = b'Hello, world!\nSecond line\r\nThird line' 35 36 # Make a package with some resources 37 package_dir = os.path.join(self.dirname, pkg) 38 os.mkdir(package_dir) 39 # Empty init.py 40 f = open(os.path.join(package_dir, '__init__.py'), "wb") 41 f.close() 42 # Resource files, res.txt, sub/res.txt 43 f = open(os.path.join(package_dir, 'res.txt'), "wb") 44 f.write(RESOURCE_DATA) 45 f.close() 46 os.mkdir(os.path.join(package_dir, 'sub')) 47 f = open(os.path.join(package_dir, 'sub', 'res.txt'), "wb") 48 f.write(RESOURCE_DATA) 49 f.close() 50 51 # Check we can read the resources 52 res1 = pkgutil.get_data(pkg, 'res.txt') 53 self.assertEqual(res1, RESOURCE_DATA) 54 res2 = pkgutil.get_data(pkg, 'sub/res.txt') 55 self.assertEqual(res2, RESOURCE_DATA) 56 57 del sys.modules[pkg] 58 59 def test_getdata_zipfile(self): 60 zip = 'test_getdata_zipfile.zip' 61 pkg = 'test_getdata_zipfile' 62 63 # Include a LF and a CRLF, to test that binary data is read back 64 RESOURCE_DATA = b'Hello, world!\nSecond line\r\nThird line' 65 66 # Make a package with some resources 67 zip_file = os.path.join(self.dirname, zip) 68 z = zipfile.ZipFile(zip_file, 'w') 69 70 # Empty init.py 71 z.writestr(pkg + '/__init__.py', "") 72 # Resource files, res.txt, sub/res.txt 73 z.writestr(pkg + '/res.txt', RESOURCE_DATA) 74 z.writestr(pkg + '/sub/res.txt', RESOURCE_DATA) 75 z.close() 76 77 # Check we can read the resources 78 sys.path.insert(0, zip_file) 79 res1 = pkgutil.get_data(pkg, 'res.txt') 80 self.assertEqual(res1, RESOURCE_DATA) 81 res2 = pkgutil.get_data(pkg, 'sub/res.txt') 82 self.assertEqual(res2, RESOURCE_DATA) 83 84 names = [] 85 for moduleinfo in pkgutil.iter_modules([zip_file]): 86 self.assertIsInstance(moduleinfo, pkgutil.ModuleInfo) 87 names.append(moduleinfo.name) 88 self.assertEqual(names, ['test_getdata_zipfile']) 89 90 del sys.path[0] 91 92 del sys.modules[pkg] 93 94 def test_unreadable_dir_on_syspath(self): 95 # issue7367 - walk_packages failed if unreadable dir on sys.path 96 package_name = "unreadable_package" 97 d = os.path.join(self.dirname, package_name) 98 # this does not appear to create an unreadable dir on Windows 99 # but the test should not fail anyway 100 os.mkdir(d, 0) 101 self.addCleanup(os.rmdir, d) 102 for t in pkgutil.walk_packages(path=[self.dirname]): 103 self.fail("unexpected package found") 104 105 def test_walkpackages_filesys(self): 106 pkg1 = 'test_walkpackages_filesys' 107 pkg1_dir = os.path.join(self.dirname, pkg1) 108 os.mkdir(pkg1_dir) 109 f = open(os.path.join(pkg1_dir, '__init__.py'), "wb") 110 f.close() 111 os.mkdir(os.path.join(pkg1_dir, 'sub')) 112 f = open(os.path.join(pkg1_dir, 'sub', '__init__.py'), "wb") 113 f.close() 114 f = open(os.path.join(pkg1_dir, 'sub', 'mod.py'), "wb") 115 f.close() 116 117 # Now, to juice it up, let's add the opposite packages, too. 118 pkg2 = 'sub' 119 pkg2_dir = os.path.join(self.dirname, pkg2) 120 os.mkdir(pkg2_dir) 121 f = open(os.path.join(pkg2_dir, '__init__.py'), "wb") 122 f.close() 123 os.mkdir(os.path.join(pkg2_dir, 'test_walkpackages_filesys')) 124 f = open(os.path.join(pkg2_dir, 'test_walkpackages_filesys', '__init__.py'), "wb") 125 f.close() 126 f = open(os.path.join(pkg2_dir, 'test_walkpackages_filesys', 'mod.py'), "wb") 127 f.close() 128 129 expected = [ 130 'sub', 131 'sub.test_walkpackages_filesys', 132 'sub.test_walkpackages_filesys.mod', 133 'test_walkpackages_filesys', 134 'test_walkpackages_filesys.sub', 135 'test_walkpackages_filesys.sub.mod', 136 ] 137 actual= [e[1] for e in pkgutil.walk_packages([self.dirname])] 138 self.assertEqual(actual, expected) 139 140 for pkg in expected: 141 if pkg.endswith('mod'): 142 continue 143 del sys.modules[pkg] 144 145 def test_walkpackages_zipfile(self): 146 """Tests the same as test_walkpackages_filesys, only with a zip file.""" 147 148 zip = 'test_walkpackages_zipfile.zip' 149 pkg1 = 'test_walkpackages_zipfile' 150 pkg2 = 'sub' 151 152 zip_file = os.path.join(self.dirname, zip) 153 z = zipfile.ZipFile(zip_file, 'w') 154 z.writestr(pkg2 + '/__init__.py', "") 155 z.writestr(pkg2 + '/' + pkg1 + '/__init__.py', "") 156 z.writestr(pkg2 + '/' + pkg1 + '/mod.py', "") 157 z.writestr(pkg1 + '/__init__.py', "") 158 z.writestr(pkg1 + '/' + pkg2 + '/__init__.py', "") 159 z.writestr(pkg1 + '/' + pkg2 + '/mod.py', "") 160 z.close() 161 162 sys.path.insert(0, zip_file) 163 expected = [ 164 'sub', 165 'sub.test_walkpackages_zipfile', 166 'sub.test_walkpackages_zipfile.mod', 167 'test_walkpackages_zipfile', 168 'test_walkpackages_zipfile.sub', 169 'test_walkpackages_zipfile.sub.mod', 170 ] 171 actual= [e[1] for e in pkgutil.walk_packages([zip_file])] 172 self.assertEqual(actual, expected) 173 del sys.path[0] 174 175 for pkg in expected: 176 if pkg.endswith('mod'): 177 continue 178 del sys.modules[pkg] 179 180 def test_walk_packages_raises_on_string_or_bytes_input(self): 181 182 str_input = 'test_dir' 183 with self.assertRaises((TypeError, ValueError)): 184 list(pkgutil.walk_packages(str_input)) 185 186 bytes_input = b'test_dir' 187 with self.assertRaises((TypeError, ValueError)): 188 list(pkgutil.walk_packages(bytes_input)) 189 190 def test_name_resolution(self): 191 import logging 192 import logging.handlers 193 194 success_cases = ( 195 ('os', os), 196 ('os.path', os.path), 197 ('os.path:pathsep', os.path.pathsep), 198 ('logging', logging), 199 ('logging:', logging), 200 ('logging.handlers', logging.handlers), 201 ('logging.handlers:', logging.handlers), 202 ('logging.handlers:SysLogHandler', logging.handlers.SysLogHandler), 203 ('logging.handlers.SysLogHandler', logging.handlers.SysLogHandler), 204 ('logging.handlers:SysLogHandler.LOG_ALERT', 205 logging.handlers.SysLogHandler.LOG_ALERT), 206 ('logging.handlers.SysLogHandler.LOG_ALERT', 207 logging.handlers.SysLogHandler.LOG_ALERT), 208 ('builtins.int', int), 209 ('builtins:int', int), 210 ('builtins.int.from_bytes', int.from_bytes), 211 ('builtins:int.from_bytes', int.from_bytes), 212 ('builtins.ZeroDivisionError', ZeroDivisionError), 213 ('builtins:ZeroDivisionError', ZeroDivisionError), 214 ('os:path', os.path), 215 ) 216 217 failure_cases = ( 218 (None, TypeError), 219 (1, TypeError), 220 (2.0, TypeError), 221 (True, TypeError), 222 ('', ValueError), 223 ('?abc', ValueError), 224 ('abc/foo', ValueError), 225 ('foo', ImportError), 226 ('os.foo', AttributeError), 227 ('os.foo:', ImportError), 228 ('os.pth:pathsep', ImportError), 229 ('logging.handlers:NoSuchHandler', AttributeError), 230 ('logging.handlers:SysLogHandler.NO_SUCH_VALUE', AttributeError), 231 ('logging.handlers.SysLogHandler.NO_SUCH_VALUE', AttributeError), 232 ('ZeroDivisionError', ImportError), 233 ('os.path.9abc', ValueError), 234 ('9abc', ValueError), 235 ) 236 237 # add some Unicode package names to the mix. 238 239 unicode_words = ('\u0935\u092e\u0938', 240 '\xe9', '\xc8', 241 '\uc548\ub155\ud558\uc138\uc694', 242 '\u3055\u3088\u306a\u3089', 243 '\u3042\u308a\u304c\u3068\u3046', 244 '\u0425\u043e\u0440\u043e\u0448\u043e', 245 '\u0441\u043f\u0430\u0441\u0438\u0431\u043e', 246 '\u73b0\u4ee3\u6c49\u8bed\u5e38\u7528\u5b57\u8868') 247 248 for uw in unicode_words: 249 d = os.path.join(self.dirname, uw) 250 try: 251 os.makedirs(d, exist_ok=True) 252 except UnicodeEncodeError: 253 # When filesystem encoding cannot encode uw: skip this test 254 continue 255 # make an empty __init__.py file 256 f = os.path.join(d, '__init__.py') 257 with open(f, 'w') as f: 258 f.write('') 259 f.flush() 260 # now import the package we just created; clearing the caches is 261 # needed, otherwise the newly created package isn't found 262 importlib.invalidate_caches() 263 mod = importlib.import_module(uw) 264 success_cases += (uw, mod), 265 if len(uw) > 1: 266 failure_cases += (uw[:-1], ImportError), 267 268 # add an example with a Unicode digit at the start 269 failure_cases += ('\u0966\u0935\u092e\u0938', ValueError), 270 271 for s, expected in success_cases: 272 with self.subTest(s=s): 273 o = pkgutil.resolve_name(s) 274 self.assertEqual(o, expected) 275 276 for s, exc in failure_cases: 277 with self.subTest(s=s): 278 with self.assertRaises(exc): 279 pkgutil.resolve_name(s) 280 281 282class PkgutilPEP302Tests(unittest.TestCase): 283 284 class MyTestLoader(object): 285 def create_module(self, spec): 286 return None 287 288 def exec_module(self, mod): 289 # Count how many times the module is reloaded 290 mod.__dict__['loads'] = mod.__dict__.get('loads', 0) + 1 291 292 def get_data(self, path): 293 return "Hello, world!" 294 295 class MyTestImporter(object): 296 def find_spec(self, fullname, path=None, target=None): 297 loader = PkgutilPEP302Tests.MyTestLoader() 298 return spec_from_file_location(fullname, 299 '<%s>' % loader.__class__.__name__, 300 loader=loader, 301 submodule_search_locations=[]) 302 303 def setUp(self): 304 sys.meta_path.insert(0, self.MyTestImporter()) 305 306 def tearDown(self): 307 del sys.meta_path[0] 308 309 def test_getdata_pep302(self): 310 # Use a dummy finder/loader 311 self.assertEqual(pkgutil.get_data('foo', 'dummy'), "Hello, world!") 312 del sys.modules['foo'] 313 314 def test_alreadyloaded(self): 315 # Ensure that get_data works without reloading - the "loads" module 316 # variable in the example loader should count how many times a reload 317 # occurs. 318 import foo 319 self.assertEqual(foo.loads, 1) 320 self.assertEqual(pkgutil.get_data('foo', 'dummy'), "Hello, world!") 321 self.assertEqual(foo.loads, 1) 322 del sys.modules['foo'] 323 324 325# These tests, especially the setup and cleanup, are hideous. They 326# need to be cleaned up once issue 14715 is addressed. 327class ExtendPathTests(unittest.TestCase): 328 def create_init(self, pkgname): 329 dirname = tempfile.mkdtemp() 330 sys.path.insert(0, dirname) 331 332 pkgdir = os.path.join(dirname, pkgname) 333 os.mkdir(pkgdir) 334 with open(os.path.join(pkgdir, '__init__.py'), 'w') as fl: 335 fl.write('from pkgutil import extend_path\n__path__ = extend_path(__path__, __name__)\n') 336 337 return dirname 338 339 def create_submodule(self, dirname, pkgname, submodule_name, value): 340 module_name = os.path.join(dirname, pkgname, submodule_name + '.py') 341 with open(module_name, 'w') as fl: 342 print('value={}'.format(value), file=fl) 343 344 def test_simple(self): 345 pkgname = 'foo' 346 dirname_0 = self.create_init(pkgname) 347 dirname_1 = self.create_init(pkgname) 348 self.create_submodule(dirname_0, pkgname, 'bar', 0) 349 self.create_submodule(dirname_1, pkgname, 'baz', 1) 350 import foo.bar 351 import foo.baz 352 # Ensure we read the expected values 353 self.assertEqual(foo.bar.value, 0) 354 self.assertEqual(foo.baz.value, 1) 355 356 # Ensure the path is set up correctly 357 self.assertEqual(sorted(foo.__path__), 358 sorted([os.path.join(dirname_0, pkgname), 359 os.path.join(dirname_1, pkgname)])) 360 361 # Cleanup 362 shutil.rmtree(dirname_0) 363 shutil.rmtree(dirname_1) 364 del sys.path[0] 365 del sys.path[0] 366 del sys.modules['foo'] 367 del sys.modules['foo.bar'] 368 del sys.modules['foo.baz'] 369 370 371 # Another awful testing hack to be cleaned up once the test_runpy 372 # helpers are factored out to a common location 373 def test_iter_importers(self): 374 iter_importers = pkgutil.iter_importers 375 get_importer = pkgutil.get_importer 376 377 pkgname = 'spam' 378 modname = 'eggs' 379 dirname = self.create_init(pkgname) 380 pathitem = os.path.join(dirname, pkgname) 381 fullname = '{}.{}'.format(pkgname, modname) 382 sys.modules.pop(fullname, None) 383 sys.modules.pop(pkgname, None) 384 try: 385 self.create_submodule(dirname, pkgname, modname, 0) 386 387 importlib.import_module(fullname) 388 389 importers = list(iter_importers(fullname)) 390 expected_importer = get_importer(pathitem) 391 for finder in importers: 392 spec = pkgutil._get_spec(finder, fullname) 393 loader = spec.loader 394 try: 395 loader = loader.loader 396 except AttributeError: 397 # For now we still allow raw loaders from 398 # find_module(). 399 pass 400 self.assertIsInstance(finder, importlib.machinery.FileFinder) 401 self.assertEqual(finder, expected_importer) 402 self.assertIsInstance(loader, 403 importlib.machinery.SourceFileLoader) 404 self.assertIsNone(pkgutil._get_spec(finder, pkgname)) 405 406 with self.assertRaises(ImportError): 407 list(iter_importers('invalid.module')) 408 409 with self.assertRaises(ImportError): 410 list(iter_importers('.spam')) 411 finally: 412 shutil.rmtree(dirname) 413 del sys.path[0] 414 try: 415 del sys.modules['spam'] 416 del sys.modules['spam.eggs'] 417 except KeyError: 418 pass 419 420 421 def test_mixed_namespace(self): 422 pkgname = 'foo' 423 dirname_0 = self.create_init(pkgname) 424 dirname_1 = self.create_init(pkgname) 425 self.create_submodule(dirname_0, pkgname, 'bar', 0) 426 # Turn this into a PEP 420 namespace package 427 os.unlink(os.path.join(dirname_0, pkgname, '__init__.py')) 428 self.create_submodule(dirname_1, pkgname, 'baz', 1) 429 import foo.bar 430 import foo.baz 431 # Ensure we read the expected values 432 self.assertEqual(foo.bar.value, 0) 433 self.assertEqual(foo.baz.value, 1) 434 435 # Ensure the path is set up correctly 436 self.assertEqual(sorted(foo.__path__), 437 sorted([os.path.join(dirname_0, pkgname), 438 os.path.join(dirname_1, pkgname)])) 439 440 # Cleanup 441 shutil.rmtree(dirname_0) 442 shutil.rmtree(dirname_1) 443 del sys.path[0] 444 del sys.path[0] 445 del sys.modules['foo'] 446 del sys.modules['foo.bar'] 447 del sys.modules['foo.baz'] 448 449 # XXX: test .pkg files 450 451 452class NestedNamespacePackageTest(unittest.TestCase): 453 454 def setUp(self): 455 self.basedir = tempfile.mkdtemp() 456 self.old_path = sys.path[:] 457 458 def tearDown(self): 459 sys.path[:] = self.old_path 460 shutil.rmtree(self.basedir) 461 462 def create_module(self, name, contents): 463 base, final = name.rsplit('.', 1) 464 base_path = os.path.join(self.basedir, base.replace('.', os.path.sep)) 465 os.makedirs(base_path, exist_ok=True) 466 with open(os.path.join(base_path, final + ".py"), 'w') as f: 467 f.write(contents) 468 469 def test_nested(self): 470 pkgutil_boilerplate = ( 471 'import pkgutil; ' 472 '__path__ = pkgutil.extend_path(__path__, __name__)') 473 self.create_module('a.pkg.__init__', pkgutil_boilerplate) 474 self.create_module('b.pkg.__init__', pkgutil_boilerplate) 475 self.create_module('a.pkg.subpkg.__init__', pkgutil_boilerplate) 476 self.create_module('b.pkg.subpkg.__init__', pkgutil_boilerplate) 477 self.create_module('a.pkg.subpkg.c', 'c = 1') 478 self.create_module('b.pkg.subpkg.d', 'd = 2') 479 sys.path.insert(0, os.path.join(self.basedir, 'a')) 480 sys.path.insert(0, os.path.join(self.basedir, 'b')) 481 import pkg 482 self.addCleanup(unload, 'pkg') 483 self.assertEqual(len(pkg.__path__), 2) 484 import pkg.subpkg 485 self.addCleanup(unload, 'pkg.subpkg') 486 self.assertEqual(len(pkg.subpkg.__path__), 2) 487 from pkg.subpkg.c import c 488 from pkg.subpkg.d import d 489 self.assertEqual(c, 1) 490 self.assertEqual(d, 2) 491 492 493class ImportlibMigrationTests(unittest.TestCase): 494 # With full PEP 302 support in the standard import machinery, the 495 # PEP 302 emulation in this module is in the process of being 496 # deprecated in favour of importlib proper 497 498 def check_deprecated(self): 499 return check_warnings( 500 ("This emulation is deprecated and slated for removal in " 501 "Python 3.12; use 'importlib' instead", 502 DeprecationWarning)) 503 504 def test_importer_deprecated(self): 505 with self.check_deprecated(): 506 pkgutil.ImpImporter("") 507 508 def test_loader_deprecated(self): 509 with self.check_deprecated(): 510 pkgutil.ImpLoader("", "", "", "") 511 512 def test_get_loader_avoids_emulation(self): 513 with check_warnings() as w: 514 self.assertIsNotNone(pkgutil.get_loader("sys")) 515 self.assertIsNotNone(pkgutil.get_loader("os")) 516 self.assertIsNotNone(pkgutil.get_loader("test.support")) 517 self.assertEqual(len(w.warnings), 0) 518 519 @unittest.skipIf(__name__ == '__main__', 'not compatible with __main__') 520 def test_get_loader_handles_missing_loader_attribute(self): 521 global __loader__ 522 this_loader = __loader__ 523 del __loader__ 524 try: 525 with check_warnings() as w: 526 self.assertIsNotNone(pkgutil.get_loader(__name__)) 527 self.assertEqual(len(w.warnings), 0) 528 finally: 529 __loader__ = this_loader 530 531 def test_get_loader_handles_missing_spec_attribute(self): 532 name = 'spam' 533 mod = type(sys)(name) 534 del mod.__spec__ 535 with CleanImport(name): 536 sys.modules[name] = mod 537 loader = pkgutil.get_loader(name) 538 self.assertIsNone(loader) 539 540 def test_get_loader_handles_spec_attribute_none(self): 541 name = 'spam' 542 mod = type(sys)(name) 543 mod.__spec__ = None 544 with CleanImport(name): 545 sys.modules[name] = mod 546 loader = pkgutil.get_loader(name) 547 self.assertIsNone(loader) 548 549 def test_get_loader_None_in_sys_modules(self): 550 name = 'totally bogus' 551 sys.modules[name] = None 552 try: 553 loader = pkgutil.get_loader(name) 554 finally: 555 del sys.modules[name] 556 self.assertIsNone(loader) 557 558 def test_find_loader_missing_module(self): 559 name = 'totally bogus' 560 loader = pkgutil.find_loader(name) 561 self.assertIsNone(loader) 562 563 def test_find_loader_avoids_emulation(self): 564 with check_warnings() as w: 565 self.assertIsNotNone(pkgutil.find_loader("sys")) 566 self.assertIsNotNone(pkgutil.find_loader("os")) 567 self.assertIsNotNone(pkgutil.find_loader("test.support")) 568 self.assertEqual(len(w.warnings), 0) 569 570 def test_get_importer_avoids_emulation(self): 571 # We use an illegal path so *none* of the path hooks should fire 572 with check_warnings() as w: 573 self.assertIsNone(pkgutil.get_importer("*??")) 574 self.assertEqual(len(w.warnings), 0) 575 576 def test_iter_importers_avoids_emulation(self): 577 with check_warnings() as w: 578 for importer in pkgutil.iter_importers(): pass 579 self.assertEqual(len(w.warnings), 0) 580 581 582def tearDownModule(): 583 # this is necessary if test is run repeated (like when finding leaks) 584 import zipimport 585 import importlib 586 zipimport._zip_directory_cache.clear() 587 importlib.invalidate_caches() 588 589 590if __name__ == '__main__': 591 unittest.main() 592