• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import contextlib
2import errno
3import importlib
4import io
5import os
6import shutil
7import socket
8import stat
9import subprocess
10import sys
11import tempfile
12import textwrap
13import time
14import unittest
15from test import support
16from test.support import script_helper
17
18TESTFN = support.TESTFN
19
20
21class TestSupport(unittest.TestCase):
22
23    def test_import_module(self):
24        support.import_module("ftplib")
25        self.assertRaises(unittest.SkipTest, support.import_module, "foo")
26
27    def test_import_fresh_module(self):
28        support.import_fresh_module("ftplib")
29
30    def test_get_attribute(self):
31        self.assertEqual(support.get_attribute(self, "test_get_attribute"),
32                        self.test_get_attribute)
33        self.assertRaises(unittest.SkipTest, support.get_attribute, self, "foo")
34
35    @unittest.skip("failing buildbots")
36    def test_get_original_stdout(self):
37        self.assertEqual(support.get_original_stdout(), sys.stdout)
38
39    def test_unload(self):
40        import sched
41        self.assertIn("sched", sys.modules)
42        support.unload("sched")
43        self.assertNotIn("sched", sys.modules)
44
45    def test_unlink(self):
46        with open(TESTFN, "w") as f:
47            pass
48        support.unlink(TESTFN)
49        self.assertFalse(os.path.exists(TESTFN))
50        support.unlink(TESTFN)
51
52    def test_rmtree(self):
53        dirpath = support.TESTFN + 'd'
54        subdirpath = os.path.join(dirpath, 'subdir')
55        os.mkdir(dirpath)
56        os.mkdir(subdirpath)
57        support.rmtree(dirpath)
58        self.assertFalse(os.path.exists(dirpath))
59        with support.swap_attr(support, 'verbose', 0):
60            support.rmtree(dirpath)
61
62        os.mkdir(dirpath)
63        os.mkdir(subdirpath)
64        os.chmod(dirpath, stat.S_IRUSR|stat.S_IXUSR)
65        with support.swap_attr(support, 'verbose', 0):
66            support.rmtree(dirpath)
67        self.assertFalse(os.path.exists(dirpath))
68
69        os.mkdir(dirpath)
70        os.mkdir(subdirpath)
71        os.chmod(dirpath, 0)
72        with support.swap_attr(support, 'verbose', 0):
73            support.rmtree(dirpath)
74        self.assertFalse(os.path.exists(dirpath))
75
76    def test_forget(self):
77        mod_filename = TESTFN + '.py'
78        with open(mod_filename, 'w') as f:
79            print('foo = 1', file=f)
80        sys.path.insert(0, os.curdir)
81        importlib.invalidate_caches()
82        try:
83            mod = __import__(TESTFN)
84            self.assertIn(TESTFN, sys.modules)
85
86            support.forget(TESTFN)
87            self.assertNotIn(TESTFN, sys.modules)
88        finally:
89            del sys.path[0]
90            support.unlink(mod_filename)
91            support.rmtree('__pycache__')
92
93    def test_HOST(self):
94        s = socket.create_server((support.HOST, 0))
95        s.close()
96
97    def test_find_unused_port(self):
98        port = support.find_unused_port()
99        s = socket.create_server((support.HOST, port))
100        s.close()
101
102    def test_bind_port(self):
103        s = socket.socket()
104        support.bind_port(s)
105        s.listen()
106        s.close()
107
108    # Tests for temp_dir()
109
110    def test_temp_dir(self):
111        """Test that temp_dir() creates and destroys its directory."""
112        parent_dir = tempfile.mkdtemp()
113        parent_dir = os.path.realpath(parent_dir)
114
115        try:
116            path = os.path.join(parent_dir, 'temp')
117            self.assertFalse(os.path.isdir(path))
118            with support.temp_dir(path) as temp_path:
119                self.assertEqual(temp_path, path)
120                self.assertTrue(os.path.isdir(path))
121            self.assertFalse(os.path.isdir(path))
122        finally:
123            support.rmtree(parent_dir)
124
125    def test_temp_dir__path_none(self):
126        """Test passing no path."""
127        with support.temp_dir() as temp_path:
128            self.assertTrue(os.path.isdir(temp_path))
129        self.assertFalse(os.path.isdir(temp_path))
130
131    def test_temp_dir__existing_dir__quiet_default(self):
132        """Test passing a directory that already exists."""
133        def call_temp_dir(path):
134            with support.temp_dir(path) as temp_path:
135                raise Exception("should not get here")
136
137        path = tempfile.mkdtemp()
138        path = os.path.realpath(path)
139        try:
140            self.assertTrue(os.path.isdir(path))
141            self.assertRaises(FileExistsError, call_temp_dir, path)
142            # Make sure temp_dir did not delete the original directory.
143            self.assertTrue(os.path.isdir(path))
144        finally:
145            shutil.rmtree(path)
146
147    def test_temp_dir__existing_dir__quiet_true(self):
148        """Test passing a directory that already exists with quiet=True."""
149        path = tempfile.mkdtemp()
150        path = os.path.realpath(path)
151
152        try:
153            with support.check_warnings() as recorder:
154                with support.temp_dir(path, quiet=True) as temp_path:
155                    self.assertEqual(path, temp_path)
156                warnings = [str(w.message) for w in recorder.warnings]
157            # Make sure temp_dir did not delete the original directory.
158            self.assertTrue(os.path.isdir(path))
159        finally:
160            shutil.rmtree(path)
161
162        self.assertEqual(len(warnings), 1, warnings)
163        warn = warnings[0]
164        self.assertTrue(warn.startswith(f'tests may fail, unable to create '
165                                        f'temporary directory {path!r}: '),
166                        warn)
167
168    @unittest.skipUnless(hasattr(os, "fork"), "test requires os.fork")
169    def test_temp_dir__forked_child(self):
170        """Test that a forked child process does not remove the directory."""
171        # See bpo-30028 for details.
172        # Run the test as an external script, because it uses fork.
173        script_helper.assert_python_ok("-c", textwrap.dedent("""
174            import os
175            from test import support
176            with support.temp_cwd() as temp_path:
177                pid = os.fork()
178                if pid != 0:
179                    # parent process (child has pid == 0)
180
181                    # wait for the child to terminate
182                    (pid, status) = os.waitpid(pid, 0)
183                    if status != 0:
184                        raise AssertionError(f"Child process failed with exit "
185                                             f"status indication 0x{status:x}.")
186
187                    # Make sure that temp_path is still present. When the child
188                    # process leaves the 'temp_cwd'-context, the __exit__()-
189                    # method of the context must not remove the temporary
190                    # directory.
191                    if not os.path.isdir(temp_path):
192                        raise AssertionError("Child removed temp_path.")
193        """))
194
195    # Tests for change_cwd()
196
197    def test_change_cwd(self):
198        original_cwd = os.getcwd()
199
200        with support.temp_dir() as temp_path:
201            with support.change_cwd(temp_path) as new_cwd:
202                self.assertEqual(new_cwd, temp_path)
203                self.assertEqual(os.getcwd(), new_cwd)
204
205        self.assertEqual(os.getcwd(), original_cwd)
206
207    def test_change_cwd__non_existent_dir(self):
208        """Test passing a non-existent directory."""
209        original_cwd = os.getcwd()
210
211        def call_change_cwd(path):
212            with support.change_cwd(path) as new_cwd:
213                raise Exception("should not get here")
214
215        with support.temp_dir() as parent_dir:
216            non_existent_dir = os.path.join(parent_dir, 'does_not_exist')
217            self.assertRaises(FileNotFoundError, call_change_cwd,
218                              non_existent_dir)
219
220        self.assertEqual(os.getcwd(), original_cwd)
221
222    def test_change_cwd__non_existent_dir__quiet_true(self):
223        """Test passing a non-existent directory with quiet=True."""
224        original_cwd = os.getcwd()
225
226        with support.temp_dir() as parent_dir:
227            bad_dir = os.path.join(parent_dir, 'does_not_exist')
228            with support.check_warnings() as recorder:
229                with support.change_cwd(bad_dir, quiet=True) as new_cwd:
230                    self.assertEqual(new_cwd, original_cwd)
231                    self.assertEqual(os.getcwd(), new_cwd)
232                warnings = [str(w.message) for w in recorder.warnings]
233
234        self.assertEqual(len(warnings), 1, warnings)
235        warn = warnings[0]
236        self.assertTrue(warn.startswith(f'tests may fail, unable to change '
237                                        f'the current working directory '
238                                        f'to {bad_dir!r}: '),
239                        warn)
240
241    # Tests for change_cwd()
242
243    def test_change_cwd__chdir_warning(self):
244        """Check the warning message when os.chdir() fails."""
245        path = TESTFN + '_does_not_exist'
246        with support.check_warnings() as recorder:
247            with support.change_cwd(path=path, quiet=True):
248                pass
249            messages = [str(w.message) for w in recorder.warnings]
250
251        self.assertEqual(len(messages), 1, messages)
252        msg = messages[0]
253        self.assertTrue(msg.startswith(f'tests may fail, unable to change '
254                                       f'the current working directory '
255                                       f'to {path!r}: '),
256                        msg)
257
258    # Tests for temp_cwd()
259
260    def test_temp_cwd(self):
261        here = os.getcwd()
262        with support.temp_cwd(name=TESTFN):
263            self.assertEqual(os.path.basename(os.getcwd()), TESTFN)
264        self.assertFalse(os.path.exists(TESTFN))
265        self.assertEqual(os.getcwd(), here)
266
267
268    def test_temp_cwd__name_none(self):
269        """Test passing None to temp_cwd()."""
270        original_cwd = os.getcwd()
271        with support.temp_cwd(name=None) as new_cwd:
272            self.assertNotEqual(new_cwd, original_cwd)
273            self.assertTrue(os.path.isdir(new_cwd))
274            self.assertEqual(os.getcwd(), new_cwd)
275        self.assertEqual(os.getcwd(), original_cwd)
276
277    def test_sortdict(self):
278        self.assertEqual(support.sortdict({3:3, 2:2, 1:1}), "{1: 1, 2: 2, 3: 3}")
279
280    def test_make_bad_fd(self):
281        fd = support.make_bad_fd()
282        with self.assertRaises(OSError) as cm:
283            os.write(fd, b"foo")
284        self.assertEqual(cm.exception.errno, errno.EBADF)
285
286    def test_check_syntax_error(self):
287        support.check_syntax_error(self, "def class", lineno=1, offset=5)
288        with self.assertRaises(AssertionError):
289            support.check_syntax_error(self, "x=1")
290
291    def test_CleanImport(self):
292        import importlib
293        with support.CleanImport("asyncore"):
294            importlib.import_module("asyncore")
295
296    def test_DirsOnSysPath(self):
297        with support.DirsOnSysPath('foo', 'bar'):
298            self.assertIn("foo", sys.path)
299            self.assertIn("bar", sys.path)
300        self.assertNotIn("foo", sys.path)
301        self.assertNotIn("bar", sys.path)
302
303    def test_captured_stdout(self):
304        with support.captured_stdout() as stdout:
305            print("hello")
306        self.assertEqual(stdout.getvalue(), "hello\n")
307
308    def test_captured_stderr(self):
309        with support.captured_stderr() as stderr:
310            print("hello", file=sys.stderr)
311        self.assertEqual(stderr.getvalue(), "hello\n")
312
313    def test_captured_stdin(self):
314        with support.captured_stdin() as stdin:
315            stdin.write('hello\n')
316            stdin.seek(0)
317            # call test code that consumes from sys.stdin
318            captured = input()
319        self.assertEqual(captured, "hello")
320
321    def test_gc_collect(self):
322        support.gc_collect()
323
324    def test_python_is_optimized(self):
325        self.assertIsInstance(support.python_is_optimized(), bool)
326
327    def test_swap_attr(self):
328        class Obj:
329            pass
330        obj = Obj()
331        obj.x = 1
332        with support.swap_attr(obj, "x", 5) as x:
333            self.assertEqual(obj.x, 5)
334            self.assertEqual(x, 1)
335        self.assertEqual(obj.x, 1)
336        with support.swap_attr(obj, "y", 5) as y:
337            self.assertEqual(obj.y, 5)
338            self.assertIsNone(y)
339        self.assertFalse(hasattr(obj, 'y'))
340        with support.swap_attr(obj, "y", 5):
341            del obj.y
342        self.assertFalse(hasattr(obj, 'y'))
343
344    def test_swap_item(self):
345        D = {"x":1}
346        with support.swap_item(D, "x", 5) as x:
347            self.assertEqual(D["x"], 5)
348            self.assertEqual(x, 1)
349        self.assertEqual(D["x"], 1)
350        with support.swap_item(D, "y", 5) as y:
351            self.assertEqual(D["y"], 5)
352            self.assertIsNone(y)
353        self.assertNotIn("y", D)
354        with support.swap_item(D, "y", 5):
355            del D["y"]
356        self.assertNotIn("y", D)
357
358    class RefClass:
359        attribute1 = None
360        attribute2 = None
361        _hidden_attribute1 = None
362        __magic_1__ = None
363
364    class OtherClass:
365        attribute2 = None
366        attribute3 = None
367        __magic_1__ = None
368        __magic_2__ = None
369
370    def test_detect_api_mismatch(self):
371        missing_items = support.detect_api_mismatch(self.RefClass,
372                                                    self.OtherClass)
373        self.assertEqual({'attribute1'}, missing_items)
374
375        missing_items = support.detect_api_mismatch(self.OtherClass,
376                                                    self.RefClass)
377        self.assertEqual({'attribute3', '__magic_2__'}, missing_items)
378
379    def test_detect_api_mismatch__ignore(self):
380        ignore = ['attribute1', 'attribute3', '__magic_2__', 'not_in_either']
381
382        missing_items = support.detect_api_mismatch(
383                self.RefClass, self.OtherClass, ignore=ignore)
384        self.assertEqual(set(), missing_items)
385
386        missing_items = support.detect_api_mismatch(
387                self.OtherClass, self.RefClass, ignore=ignore)
388        self.assertEqual(set(), missing_items)
389
390    def test_check__all__(self):
391        extra = {'tempdir'}
392        blacklist = {'template'}
393        support.check__all__(self,
394                             tempfile,
395                             extra=extra,
396                             blacklist=blacklist)
397
398        extra = {'TextTestResult', 'installHandler'}
399        blacklist = {'load_tests', "TestProgram", "BaseTestSuite"}
400
401        support.check__all__(self,
402                             unittest,
403                             ("unittest.result", "unittest.case",
404                              "unittest.suite", "unittest.loader",
405                              "unittest.main", "unittest.runner",
406                              "unittest.signals", "unittest.async_case"),
407                             extra=extra,
408                             blacklist=blacklist)
409
410        self.assertRaises(AssertionError, support.check__all__, self, unittest)
411
412    @unittest.skipUnless(hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG'),
413                         'need os.waitpid() and os.WNOHANG')
414    def test_reap_children(self):
415        # Make sure that there is no other pending child process
416        support.reap_children()
417
418        # Create a child process
419        pid = os.fork()
420        if pid == 0:
421            # child process: do nothing, just exit
422            os._exit(0)
423
424        t0 = time.monotonic()
425        deadline = time.monotonic() + 60.0
426
427        was_altered = support.environment_altered
428        try:
429            support.environment_altered = False
430            stderr = io.StringIO()
431
432            while True:
433                if time.monotonic() > deadline:
434                    self.fail("timeout")
435
436                with contextlib.redirect_stderr(stderr):
437                    support.reap_children()
438
439                # Use environment_altered to check if reap_children() found
440                # the child process
441                if support.environment_altered:
442                    break
443
444                # loop until the child process completed
445                time.sleep(0.100)
446
447            msg = "Warning -- reap_children() reaped child process %s" % pid
448            self.assertIn(msg, stderr.getvalue())
449            self.assertTrue(support.environment_altered)
450        finally:
451            support.environment_altered = was_altered
452
453        # Just in case, check again that there is no other
454        # pending child process
455        support.reap_children()
456
457    def check_options(self, args, func, expected=None):
458        code = f'from test.support import {func}; print(repr({func}()))'
459        cmd = [sys.executable, *args, '-c', code]
460        env = {key: value for key, value in os.environ.items()
461               if not key.startswith('PYTHON')}
462        proc = subprocess.run(cmd,
463                              stdout=subprocess.PIPE,
464                              stderr=subprocess.DEVNULL,
465                              universal_newlines=True,
466                              env=env)
467        if expected is None:
468            expected = args
469        self.assertEqual(proc.stdout.rstrip(), repr(expected))
470        self.assertEqual(proc.returncode, 0)
471
472    def test_args_from_interpreter_flags(self):
473        # Test test.support.args_from_interpreter_flags()
474        for opts in (
475            # no option
476            [],
477            # single option
478            ['-B'],
479            ['-s'],
480            ['-S'],
481            ['-E'],
482            ['-v'],
483            ['-b'],
484            ['-q'],
485            ['-I'],
486            # same option multiple times
487            ['-bb'],
488            ['-vvv'],
489            # -W options
490            ['-Wignore'],
491            # -X options
492            ['-X', 'dev'],
493            ['-Wignore', '-X', 'dev'],
494            ['-X', 'faulthandler'],
495            ['-X', 'importtime'],
496            ['-X', 'showalloccount'],
497            ['-X', 'showrefcount'],
498            ['-X', 'tracemalloc'],
499            ['-X', 'tracemalloc=3'],
500        ):
501            with self.subTest(opts=opts):
502                self.check_options(opts, 'args_from_interpreter_flags')
503
504        self.check_options(['-I', '-E', '-s'], 'args_from_interpreter_flags',
505                           ['-I'])
506
507    def test_optim_args_from_interpreter_flags(self):
508        # Test test.support.optim_args_from_interpreter_flags()
509        for opts in (
510            # no option
511            [],
512            ['-O'],
513            ['-OO'],
514            ['-OOOO'],
515        ):
516            with self.subTest(opts=opts):
517                self.check_options(opts, 'optim_args_from_interpreter_flags')
518
519    def test_match_test(self):
520        class Test:
521            def __init__(self, test_id):
522                self.test_id = test_id
523
524            def id(self):
525                return self.test_id
526
527        test_access = Test('test.test_os.FileTests.test_access')
528        test_chdir = Test('test.test_os.Win32ErrorTests.test_chdir')
529
530        with support.swap_attr(support, '_match_test_func', None):
531            # match all
532            support.set_match_tests([])
533            self.assertTrue(support.match_test(test_access))
534            self.assertTrue(support.match_test(test_chdir))
535
536            # match all using None
537            support.set_match_tests(None)
538            self.assertTrue(support.match_test(test_access))
539            self.assertTrue(support.match_test(test_chdir))
540
541            # match the full test identifier
542            support.set_match_tests([test_access.id()])
543            self.assertTrue(support.match_test(test_access))
544            self.assertFalse(support.match_test(test_chdir))
545
546            # match the module name
547            support.set_match_tests(['test_os'])
548            self.assertTrue(support.match_test(test_access))
549            self.assertTrue(support.match_test(test_chdir))
550
551            # Test '*' pattern
552            support.set_match_tests(['test_*'])
553            self.assertTrue(support.match_test(test_access))
554            self.assertTrue(support.match_test(test_chdir))
555
556            # Test case sensitivity
557            support.set_match_tests(['filetests'])
558            self.assertFalse(support.match_test(test_access))
559            support.set_match_tests(['FileTests'])
560            self.assertTrue(support.match_test(test_access))
561
562            # Test pattern containing '.' and a '*' metacharacter
563            support.set_match_tests(['*test_os.*.test_*'])
564            self.assertTrue(support.match_test(test_access))
565            self.assertTrue(support.match_test(test_chdir))
566
567            # Multiple patterns
568            support.set_match_tests([test_access.id(), test_chdir.id()])
569            self.assertTrue(support.match_test(test_access))
570            self.assertTrue(support.match_test(test_chdir))
571
572            support.set_match_tests(['test_access', 'DONTMATCH'])
573            self.assertTrue(support.match_test(test_access))
574            self.assertFalse(support.match_test(test_chdir))
575
576    def test_fd_count(self):
577        # We cannot test the absolute value of fd_count(): on old Linux
578        # kernel or glibc versions, os.urandom() keeps a FD open on
579        # /dev/urandom device and Python has 4 FD opens instead of 3.
580        start = support.fd_count()
581        fd = os.open(__file__, os.O_RDONLY)
582        try:
583            more = support.fd_count()
584        finally:
585            os.close(fd)
586        self.assertEqual(more - start, 1)
587
588    # XXX -follows a list of untested API
589    # make_legacy_pyc
590    # is_resource_enabled
591    # requires
592    # fcmp
593    # umaks
594    # findfile
595    # check_warnings
596    # EnvironmentVarGuard
597    # TransientResource
598    # transient_internet
599    # run_with_locale
600    # set_memlimit
601    # bigmemtest
602    # precisionbigmemtest
603    # bigaddrspacetest
604    # requires_resource
605    # run_doctest
606    # threading_cleanup
607    # reap_threads
608    # strip_python_stderr
609    # can_symlink
610    # skip_unless_symlink
611    # SuppressCrashReport
612
613
614def test_main():
615    tests = [TestSupport]
616    support.run_unittest(*tests)
617
618if __name__ == '__main__':
619    test_main()
620