1import gc 2import io 3import os 4import sys 5import signal 6import weakref 7 8import unittest 9 10 11@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") 12@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") 13@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 " 14 "if threads have been used") 15class TestBreak(unittest.TestCase): 16 int_handler = None 17 18 def setUp(self): 19 self._default_handler = signal.getsignal(signal.SIGINT) 20 if self.int_handler is not None: 21 signal.signal(signal.SIGINT, self.int_handler) 22 23 def tearDown(self): 24 signal.signal(signal.SIGINT, self._default_handler) 25 unittest.signals._results = weakref.WeakKeyDictionary() 26 unittest.signals._interrupt_handler = None 27 28 29 def testInstallHandler(self): 30 default_handler = signal.getsignal(signal.SIGINT) 31 unittest.installHandler() 32 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 33 34 try: 35 pid = os.getpid() 36 os.kill(pid, signal.SIGINT) 37 except KeyboardInterrupt: 38 self.fail("KeyboardInterrupt not handled") 39 40 self.assertTrue(unittest.signals._interrupt_handler.called) 41 42 def testRegisterResult(self): 43 result = unittest.TestResult() 44 unittest.registerResult(result) 45 46 for ref in unittest.signals._results: 47 if ref is result: 48 break 49 elif ref is not result: 50 self.fail("odd object in result set") 51 else: 52 self.fail("result not found") 53 54 55 def testInterruptCaught(self): 56 default_handler = signal.getsignal(signal.SIGINT) 57 58 result = unittest.TestResult() 59 unittest.installHandler() 60 unittest.registerResult(result) 61 62 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 63 64 def test(result): 65 pid = os.getpid() 66 os.kill(pid, signal.SIGINT) 67 result.breakCaught = True 68 self.assertTrue(result.shouldStop) 69 70 try: 71 test(result) 72 except KeyboardInterrupt: 73 self.fail("KeyboardInterrupt not handled") 74 self.assertTrue(result.breakCaught) 75 76 77 def testSecondInterrupt(self): 78 # Can't use skipIf decorator because the signal handler may have 79 # been changed after defining this method. 80 if signal.getsignal(signal.SIGINT) == signal.SIG_IGN: 81 self.skipTest("test requires SIGINT to not be ignored") 82 result = unittest.TestResult() 83 unittest.installHandler() 84 unittest.registerResult(result) 85 86 def test(result): 87 pid = os.getpid() 88 os.kill(pid, signal.SIGINT) 89 result.breakCaught = True 90 self.assertTrue(result.shouldStop) 91 os.kill(pid, signal.SIGINT) 92 self.fail("Second KeyboardInterrupt not raised") 93 94 try: 95 test(result) 96 except KeyboardInterrupt: 97 pass 98 else: 99 self.fail("Second KeyboardInterrupt not raised") 100 self.assertTrue(result.breakCaught) 101 102 103 def testTwoResults(self): 104 unittest.installHandler() 105 106 result = unittest.TestResult() 107 unittest.registerResult(result) 108 new_handler = signal.getsignal(signal.SIGINT) 109 110 result2 = unittest.TestResult() 111 unittest.registerResult(result2) 112 self.assertEqual(signal.getsignal(signal.SIGINT), new_handler) 113 114 result3 = unittest.TestResult() 115 116 def test(result): 117 pid = os.getpid() 118 os.kill(pid, signal.SIGINT) 119 120 try: 121 test(result) 122 except KeyboardInterrupt: 123 self.fail("KeyboardInterrupt not handled") 124 125 self.assertTrue(result.shouldStop) 126 self.assertTrue(result2.shouldStop) 127 self.assertFalse(result3.shouldStop) 128 129 130 def testHandlerReplacedButCalled(self): 131 # Can't use skipIf decorator because the signal handler may have 132 # been changed after defining this method. 133 if signal.getsignal(signal.SIGINT) == signal.SIG_IGN: 134 self.skipTest("test requires SIGINT to not be ignored") 135 # If our handler has been replaced (is no longer installed) but is 136 # called by the *new* handler, then it isn't safe to delay the 137 # SIGINT and we should immediately delegate to the default handler 138 unittest.installHandler() 139 140 handler = signal.getsignal(signal.SIGINT) 141 def new_handler(frame, signum): 142 handler(frame, signum) 143 signal.signal(signal.SIGINT, new_handler) 144 145 try: 146 pid = os.getpid() 147 os.kill(pid, signal.SIGINT) 148 except KeyboardInterrupt: 149 pass 150 else: 151 self.fail("replaced but delegated handler doesn't raise interrupt") 152 153 def testRunner(self): 154 # Creating a TextTestRunner with the appropriate argument should 155 # register the TextTestResult it creates 156 runner = unittest.TextTestRunner(stream=io.StringIO()) 157 158 result = runner.run(unittest.TestSuite()) 159 self.assertIn(result, unittest.signals._results) 160 161 def testWeakReferences(self): 162 # Calling registerResult on a result should not keep it alive 163 result = unittest.TestResult() 164 unittest.registerResult(result) 165 166 ref = weakref.ref(result) 167 del result 168 169 # For non-reference counting implementations 170 gc.collect();gc.collect() 171 self.assertIsNone(ref()) 172 173 174 def testRemoveResult(self): 175 result = unittest.TestResult() 176 unittest.registerResult(result) 177 178 unittest.installHandler() 179 self.assertTrue(unittest.removeResult(result)) 180 181 # Should this raise an error instead? 182 self.assertFalse(unittest.removeResult(unittest.TestResult())) 183 184 try: 185 pid = os.getpid() 186 os.kill(pid, signal.SIGINT) 187 except KeyboardInterrupt: 188 pass 189 190 self.assertFalse(result.shouldStop) 191 192 def testMainInstallsHandler(self): 193 failfast = object() 194 test = object() 195 verbosity = object() 196 result = object() 197 default_handler = signal.getsignal(signal.SIGINT) 198 199 class FakeRunner(object): 200 initArgs = [] 201 runArgs = [] 202 def __init__(self, *args, **kwargs): 203 self.initArgs.append((args, kwargs)) 204 def run(self, test): 205 self.runArgs.append(test) 206 return result 207 208 class Program(unittest.TestProgram): 209 def __init__(self, catchbreak): 210 self.exit = False 211 self.verbosity = verbosity 212 self.failfast = failfast 213 self.catchbreak = catchbreak 214 self.tb_locals = False 215 self.testRunner = FakeRunner 216 self.test = test 217 self.result = None 218 219 p = Program(False) 220 p.runTests() 221 222 self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, 223 'verbosity': verbosity, 224 'failfast': failfast, 225 'tb_locals': False, 226 'warnings': None})]) 227 self.assertEqual(FakeRunner.runArgs, [test]) 228 self.assertEqual(p.result, result) 229 230 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 231 232 FakeRunner.initArgs = [] 233 FakeRunner.runArgs = [] 234 p = Program(True) 235 p.runTests() 236 237 self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, 238 'verbosity': verbosity, 239 'failfast': failfast, 240 'tb_locals': False, 241 'warnings': None})]) 242 self.assertEqual(FakeRunner.runArgs, [test]) 243 self.assertEqual(p.result, result) 244 245 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 246 247 def testRemoveHandler(self): 248 default_handler = signal.getsignal(signal.SIGINT) 249 unittest.installHandler() 250 unittest.removeHandler() 251 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 252 253 # check that calling removeHandler multiple times has no ill-effect 254 unittest.removeHandler() 255 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 256 257 def testRemoveHandlerAsDecorator(self): 258 default_handler = signal.getsignal(signal.SIGINT) 259 unittest.installHandler() 260 261 @unittest.removeHandler 262 def test(): 263 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 264 265 test() 266 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 267 268@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") 269@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") 270@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 " 271 "if threads have been used") 272class TestBreakDefaultIntHandler(TestBreak): 273 int_handler = signal.default_int_handler 274 275@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") 276@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") 277@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 " 278 "if threads have been used") 279class TestBreakSignalIgnored(TestBreak): 280 int_handler = signal.SIG_IGN 281 282@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") 283@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") 284@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 " 285 "if threads have been used") 286class TestBreakSignalDefault(TestBreak): 287 int_handler = signal.SIG_DFL 288 289 290if __name__ == "__main__": 291 unittest.main() 292