• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import gc
2import io
3import os
4import sys
5import signal
6import weakref
7
8import unittest
9
10
11@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
12@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
13@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
14    "if threads have been used")
15class TestBreak(unittest.TestCase):
16    int_handler = None
17
18    def setUp(self):
19        self._default_handler = signal.getsignal(signal.SIGINT)
20        if self.int_handler is not None:
21            signal.signal(signal.SIGINT, self.int_handler)
22
23    def tearDown(self):
24        signal.signal(signal.SIGINT, self._default_handler)
25        unittest.signals._results = weakref.WeakKeyDictionary()
26        unittest.signals._interrupt_handler = None
27
28
29    def testInstallHandler(self):
30        default_handler = signal.getsignal(signal.SIGINT)
31        unittest.installHandler()
32        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
33
34        try:
35            pid = os.getpid()
36            os.kill(pid, signal.SIGINT)
37        except KeyboardInterrupt:
38            self.fail("KeyboardInterrupt not handled")
39
40        self.assertTrue(unittest.signals._interrupt_handler.called)
41
42    def testRegisterResult(self):
43        result = unittest.TestResult()
44        unittest.registerResult(result)
45
46        for ref in unittest.signals._results:
47            if ref is result:
48                break
49            elif ref is not result:
50                self.fail("odd object in result set")
51        else:
52            self.fail("result not found")
53
54
55    def testInterruptCaught(self):
56        default_handler = signal.getsignal(signal.SIGINT)
57
58        result = unittest.TestResult()
59        unittest.installHandler()
60        unittest.registerResult(result)
61
62        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
63
64        def test(result):
65            pid = os.getpid()
66            os.kill(pid, signal.SIGINT)
67            result.breakCaught = True
68            self.assertTrue(result.shouldStop)
69
70        try:
71            test(result)
72        except KeyboardInterrupt:
73            self.fail("KeyboardInterrupt not handled")
74        self.assertTrue(result.breakCaught)
75
76
77    def testSecondInterrupt(self):
78        # Can't use skipIf decorator because the signal handler may have
79        # been changed after defining this method.
80        if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
81            self.skipTest("test requires SIGINT to not be ignored")
82        result = unittest.TestResult()
83        unittest.installHandler()
84        unittest.registerResult(result)
85
86        def test(result):
87            pid = os.getpid()
88            os.kill(pid, signal.SIGINT)
89            result.breakCaught = True
90            self.assertTrue(result.shouldStop)
91            os.kill(pid, signal.SIGINT)
92            self.fail("Second KeyboardInterrupt not raised")
93
94        try:
95            test(result)
96        except KeyboardInterrupt:
97            pass
98        else:
99            self.fail("Second KeyboardInterrupt not raised")
100        self.assertTrue(result.breakCaught)
101
102
103    def testTwoResults(self):
104        unittest.installHandler()
105
106        result = unittest.TestResult()
107        unittest.registerResult(result)
108        new_handler = signal.getsignal(signal.SIGINT)
109
110        result2 = unittest.TestResult()
111        unittest.registerResult(result2)
112        self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
113
114        result3 = unittest.TestResult()
115
116        def test(result):
117            pid = os.getpid()
118            os.kill(pid, signal.SIGINT)
119
120        try:
121            test(result)
122        except KeyboardInterrupt:
123            self.fail("KeyboardInterrupt not handled")
124
125        self.assertTrue(result.shouldStop)
126        self.assertTrue(result2.shouldStop)
127        self.assertFalse(result3.shouldStop)
128
129
130    def testHandlerReplacedButCalled(self):
131        # Can't use skipIf decorator because the signal handler may have
132        # been changed after defining this method.
133        if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
134            self.skipTest("test requires SIGINT to not be ignored")
135        # If our handler has been replaced (is no longer installed) but is
136        # called by the *new* handler, then it isn't safe to delay the
137        # SIGINT and we should immediately delegate to the default handler
138        unittest.installHandler()
139
140        handler = signal.getsignal(signal.SIGINT)
141        def new_handler(frame, signum):
142            handler(frame, signum)
143        signal.signal(signal.SIGINT, new_handler)
144
145        try:
146            pid = os.getpid()
147            os.kill(pid, signal.SIGINT)
148        except KeyboardInterrupt:
149            pass
150        else:
151            self.fail("replaced but delegated handler doesn't raise interrupt")
152
153    def testRunner(self):
154        # Creating a TextTestRunner with the appropriate argument should
155        # register the TextTestResult it creates
156        runner = unittest.TextTestRunner(stream=io.StringIO())
157
158        result = runner.run(unittest.TestSuite())
159        self.assertIn(result, unittest.signals._results)
160
161    def testWeakReferences(self):
162        # Calling registerResult on a result should not keep it alive
163        result = unittest.TestResult()
164        unittest.registerResult(result)
165
166        ref = weakref.ref(result)
167        del result
168
169        # For non-reference counting implementations
170        gc.collect();gc.collect()
171        self.assertIsNone(ref())
172
173
174    def testRemoveResult(self):
175        result = unittest.TestResult()
176        unittest.registerResult(result)
177
178        unittest.installHandler()
179        self.assertTrue(unittest.removeResult(result))
180
181        # Should this raise an error instead?
182        self.assertFalse(unittest.removeResult(unittest.TestResult()))
183
184        try:
185            pid = os.getpid()
186            os.kill(pid, signal.SIGINT)
187        except KeyboardInterrupt:
188            pass
189
190        self.assertFalse(result.shouldStop)
191
192    def testMainInstallsHandler(self):
193        failfast = object()
194        test = object()
195        verbosity = object()
196        result = object()
197        default_handler = signal.getsignal(signal.SIGINT)
198
199        class FakeRunner(object):
200            initArgs = []
201            runArgs = []
202            def __init__(self, *args, **kwargs):
203                self.initArgs.append((args, kwargs))
204            def run(self, test):
205                self.runArgs.append(test)
206                return result
207
208        class Program(unittest.TestProgram):
209            def __init__(self, catchbreak):
210                self.exit = False
211                self.verbosity = verbosity
212                self.failfast = failfast
213                self.catchbreak = catchbreak
214                self.tb_locals = False
215                self.testRunner = FakeRunner
216                self.test = test
217                self.result = None
218
219        p = Program(False)
220        p.runTests()
221
222        self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
223                                                     'verbosity': verbosity,
224                                                     'failfast': failfast,
225                                                     'tb_locals': False,
226                                                     'warnings': None})])
227        self.assertEqual(FakeRunner.runArgs, [test])
228        self.assertEqual(p.result, result)
229
230        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
231
232        FakeRunner.initArgs = []
233        FakeRunner.runArgs = []
234        p = Program(True)
235        p.runTests()
236
237        self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
238                                                     'verbosity': verbosity,
239                                                     'failfast': failfast,
240                                                     'tb_locals': False,
241                                                     'warnings': None})])
242        self.assertEqual(FakeRunner.runArgs, [test])
243        self.assertEqual(p.result, result)
244
245        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
246
247    def testRemoveHandler(self):
248        default_handler = signal.getsignal(signal.SIGINT)
249        unittest.installHandler()
250        unittest.removeHandler()
251        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
252
253        # check that calling removeHandler multiple times has no ill-effect
254        unittest.removeHandler()
255        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
256
257    def testRemoveHandlerAsDecorator(self):
258        default_handler = signal.getsignal(signal.SIGINT)
259        unittest.installHandler()
260
261        @unittest.removeHandler
262        def test():
263            self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
264
265        test()
266        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
267
268@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
269@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
270@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
271    "if threads have been used")
272class TestBreakDefaultIntHandler(TestBreak):
273    int_handler = signal.default_int_handler
274
275@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
276@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
277@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
278    "if threads have been used")
279class TestBreakSignalIgnored(TestBreak):
280    int_handler = signal.SIG_IGN
281
282@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
283@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
284@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
285    "if threads have been used")
286class TestBreakSignalDefault(TestBreak):
287    int_handler = signal.SIG_DFL
288
289
290if __name__ == "__main__":
291    unittest.main()
292