• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from test import test_support
3from contextlib import closing
4import gc
5import pickle
6import select
7import signal
8import subprocess
9import traceback
10import sys, os, time, errno
11
12if sys.platform in ('os2', 'riscos'):
13    raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
14
15
16class HandlerBCalled(Exception):
17    pass
18
19
20def exit_subprocess():
21    """Use os._exit(0) to exit the current subprocess.
22
23    Otherwise, the test catches the SystemExit and continues executing
24    in parallel with the original test, so you wind up with an
25    exponential number of tests running concurrently.
26    """
27    os._exit(0)
28
29
30def ignoring_eintr(__func, *args, **kwargs):
31    try:
32        return __func(*args, **kwargs)
33    except EnvironmentError as e:
34        if e.errno != errno.EINTR:
35            raise
36        return None
37
38
39@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
40class InterProcessSignalTests(unittest.TestCase):
41    MAX_DURATION = 20   # Entire test should last at most 20 sec.
42
43    def setUp(self):
44        self.using_gc = gc.isenabled()
45        gc.disable()
46
47    def tearDown(self):
48        if self.using_gc:
49            gc.enable()
50
51    def format_frame(self, frame, limit=None):
52        return ''.join(traceback.format_stack(frame, limit=limit))
53
54    def handlerA(self, signum, frame):
55        self.a_called = True
56        if test_support.verbose:
57            print "handlerA invoked from signal %s at:\n%s" % (
58                signum, self.format_frame(frame, limit=1))
59
60    def handlerB(self, signum, frame):
61        self.b_called = True
62        if test_support.verbose:
63            print "handlerB invoked from signal %s at:\n%s" % (
64                signum, self.format_frame(frame, limit=1))
65        raise HandlerBCalled(signum, self.format_frame(frame))
66
67    def wait(self, child):
68        """Wait for child to finish, ignoring EINTR."""
69        while True:
70            try:
71                child.wait()
72                return
73            except OSError as e:
74                if e.errno != errno.EINTR:
75                    raise
76
77    def run_test(self):
78        # Install handlers. This function runs in a sub-process, so we
79        # don't worry about re-setting the default handlers.
80        signal.signal(signal.SIGHUP, self.handlerA)
81        signal.signal(signal.SIGUSR1, self.handlerB)
82        signal.signal(signal.SIGUSR2, signal.SIG_IGN)
83        signal.signal(signal.SIGALRM, signal.default_int_handler)
84
85        # Variables the signals will modify:
86        self.a_called = False
87        self.b_called = False
88
89        # Let the sub-processes know who to send signals to.
90        pid = os.getpid()
91        if test_support.verbose:
92            print "test runner's pid is", pid
93
94        child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
95        if child:
96            self.wait(child)
97            if not self.a_called:
98                time.sleep(1)  # Give the signal time to be delivered.
99        self.assertTrue(self.a_called)
100        self.assertFalse(self.b_called)
101        self.a_called = False
102
103        # Make sure the signal isn't delivered while the previous
104        # Popen object is being destroyed, because __del__ swallows
105        # exceptions.
106        del child
107        try:
108            child = subprocess.Popen(['kill', '-USR1', str(pid)])
109            # This wait should be interrupted by the signal's exception.
110            self.wait(child)
111            time.sleep(1)  # Give the signal time to be delivered.
112            self.fail('HandlerBCalled exception not raised')
113        except HandlerBCalled:
114            self.assertTrue(self.b_called)
115            self.assertFalse(self.a_called)
116            if test_support.verbose:
117                print "HandlerBCalled exception caught"
118
119        child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
120        if child:
121            self.wait(child)  # Nothing should happen.
122
123        try:
124            signal.alarm(1)
125            # The race condition in pause doesn't matter in this case,
126            # since alarm is going to raise a KeyboardException, which
127            # will skip the call.
128            signal.pause()
129            # But if another signal arrives before the alarm, pause
130            # may return early.
131            time.sleep(1)
132        except KeyboardInterrupt:
133            if test_support.verbose:
134                print "KeyboardInterrupt (the alarm() went off)"
135        except:
136            self.fail("Some other exception woke us from pause: %s" %
137                      traceback.format_exc())
138        else:
139            self.fail("pause returned of its own accord, and the signal"
140                      " didn't arrive after another second.")
141
142    # Issue 3864. Unknown if this affects earlier versions of freebsd also.
143    @unittest.skipIf(sys.platform=='freebsd6',
144        'inter process signals not reliable (do not mix well with threading) '
145        'on freebsd6')
146    def test_main(self):
147        # This function spawns a child process to insulate the main
148        # test-running process from all the signals. It then
149        # communicates with that child process over a pipe and
150        # re-raises information about any exceptions the child
151        # raises. The real work happens in self.run_test().
152        os_done_r, os_done_w = os.pipe()
153        with closing(os.fdopen(os_done_r)) as done_r, \
154             closing(os.fdopen(os_done_w, 'w')) as done_w:
155            child = os.fork()
156            if child == 0:
157                # In the child process; run the test and report results
158                # through the pipe.
159                try:
160                    done_r.close()
161                    # Have to close done_w again here because
162                    # exit_subprocess() will skip the enclosing with block.
163                    with closing(done_w):
164                        try:
165                            self.run_test()
166                        except:
167                            pickle.dump(traceback.format_exc(), done_w)
168                        else:
169                            pickle.dump(None, done_w)
170                except:
171                    print 'Uh oh, raised from pickle.'
172                    traceback.print_exc()
173                finally:
174                    exit_subprocess()
175
176            done_w.close()
177            # Block for up to MAX_DURATION seconds for the test to finish.
178            r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
179            if done_r in r:
180                tb = pickle.load(done_r)
181                if tb:
182                    self.fail(tb)
183            else:
184                os.kill(child, signal.SIGKILL)
185                self.fail('Test deadlocked after %d seconds.' %
186                          self.MAX_DURATION)
187
188
189@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
190class BasicSignalTests(unittest.TestCase):
191    def trivial_signal_handler(self, *args):
192        pass
193
194    def test_out_of_range_signal_number_raises_error(self):
195        self.assertRaises(ValueError, signal.getsignal, 4242)
196
197        self.assertRaises(ValueError, signal.signal, 4242,
198                          self.trivial_signal_handler)
199
200    def test_setting_signal_handler_to_none_raises_error(self):
201        self.assertRaises(TypeError, signal.signal,
202                          signal.SIGUSR1, None)
203
204    def test_getsignal(self):
205        hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
206        self.assertEqual(signal.getsignal(signal.SIGHUP),
207                         self.trivial_signal_handler)
208        signal.signal(signal.SIGHUP, hup)
209        self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
210
211
212@unittest.skipUnless(sys.platform == "win32", "Windows specific")
213class WindowsSignalTests(unittest.TestCase):
214    def test_issue9324(self):
215        # Updated for issue #10003, adding SIGBREAK
216        handler = lambda x, y: None
217        for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
218                    signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
219                    signal.SIGTERM):
220            # Set and then reset a handler for signals that work on windows
221            signal.signal(sig, signal.signal(sig, handler))
222
223        with self.assertRaises(ValueError):
224            signal.signal(-1, handler)
225
226        with self.assertRaises(ValueError):
227            signal.signal(7, handler)
228
229
230class WakeupFDTests(unittest.TestCase):
231
232    def test_invalid_fd(self):
233        fd = test_support.make_bad_fd()
234        self.assertRaises(ValueError, signal.set_wakeup_fd, fd)
235
236
237@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
238class WakeupSignalTests(unittest.TestCase):
239    TIMEOUT_FULL = 10
240    TIMEOUT_HALF = 5
241
242    def test_wakeup_fd_early(self):
243        import select
244
245        signal.alarm(1)
246        before_time = time.time()
247        # We attempt to get a signal during the sleep,
248        # before select is called
249        time.sleep(self.TIMEOUT_FULL)
250        mid_time = time.time()
251        self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
252        select.select([self.read], [], [], self.TIMEOUT_FULL)
253        after_time = time.time()
254        self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
255
256    def test_wakeup_fd_during(self):
257        import select
258
259        signal.alarm(1)
260        before_time = time.time()
261        # We attempt to get a signal during the select call
262        self.assertRaises(select.error, select.select,
263            [self.read], [], [], self.TIMEOUT_FULL)
264        after_time = time.time()
265        self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
266
267    def setUp(self):
268        import fcntl
269
270        self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
271        self.read, self.write = os.pipe()
272        flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
273        flags = flags | os.O_NONBLOCK
274        fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
275        self.old_wakeup = signal.set_wakeup_fd(self.write)
276
277    def tearDown(self):
278        signal.set_wakeup_fd(self.old_wakeup)
279        os.close(self.read)
280        os.close(self.write)
281        signal.signal(signal.SIGALRM, self.alrm)
282
283@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
284class SiginterruptTest(unittest.TestCase):
285
286    def setUp(self):
287        """Install a no-op signal handler that can be set to allow
288        interrupts or not, and arrange for the original signal handler to be
289        re-installed when the test is finished.
290        """
291        self.signum = signal.SIGUSR1
292        oldhandler = signal.signal(self.signum, lambda x,y: None)
293        self.addCleanup(signal.signal, self.signum, oldhandler)
294
295    def readpipe_interrupted(self):
296        """Perform a read during which a signal will arrive.  Return True if the
297        read is interrupted by the signal and raises an exception.  Return False
298        if it returns normally.
299        """
300        # Create a pipe that can be used for the read.  Also clean it up
301        # when the test is over, since nothing else will (but see below for
302        # the write end).
303        r, w = os.pipe()
304        self.addCleanup(os.close, r)
305
306        # Create another process which can send a signal to this one to try
307        # to interrupt the read.
308        ppid = os.getpid()
309        pid = os.fork()
310
311        if pid == 0:
312            # Child code: sleep to give the parent enough time to enter the
313            # read() call (there's a race here, but it's really tricky to
314            # eliminate it); then signal the parent process.  Also, sleep
315            # again to make it likely that the signal is delivered to the
316            # parent process before the child exits.  If the child exits
317            # first, the write end of the pipe will be closed and the test
318            # is invalid.
319            try:
320                time.sleep(0.2)
321                os.kill(ppid, self.signum)
322                time.sleep(0.2)
323            finally:
324                # No matter what, just exit as fast as possible now.
325                exit_subprocess()
326        else:
327            # Parent code.
328            # Make sure the child is eventually reaped, else it'll be a
329            # zombie for the rest of the test suite run.
330            self.addCleanup(os.waitpid, pid, 0)
331
332            # Close the write end of the pipe.  The child has a copy, so
333            # it's not really closed until the child exits.  We need it to
334            # close when the child exits so that in the non-interrupt case
335            # the read eventually completes, otherwise we could just close
336            # it *after* the test.
337            os.close(w)
338
339            # Try the read and report whether it is interrupted or not to
340            # the caller.
341            try:
342                d = os.read(r, 1)
343                return False
344            except OSError, err:
345                if err.errno != errno.EINTR:
346                    raise
347                return True
348
349    def test_without_siginterrupt(self):
350        """If a signal handler is installed and siginterrupt is not called
351        at all, when that signal arrives, it interrupts a syscall that's in
352        progress.
353        """
354        i = self.readpipe_interrupted()
355        self.assertTrue(i)
356        # Arrival of the signal shouldn't have changed anything.
357        i = self.readpipe_interrupted()
358        self.assertTrue(i)
359
360    def test_siginterrupt_on(self):
361        """If a signal handler is installed and siginterrupt is called with
362        a true value for the second argument, when that signal arrives, it
363        interrupts a syscall that's in progress.
364        """
365        signal.siginterrupt(self.signum, 1)
366        i = self.readpipe_interrupted()
367        self.assertTrue(i)
368        # Arrival of the signal shouldn't have changed anything.
369        i = self.readpipe_interrupted()
370        self.assertTrue(i)
371
372    def test_siginterrupt_off(self):
373        """If a signal handler is installed and siginterrupt is called with
374        a false value for the second argument, when that signal arrives, it
375        does not interrupt a syscall that's in progress.
376        """
377        signal.siginterrupt(self.signum, 0)
378        i = self.readpipe_interrupted()
379        self.assertFalse(i)
380        # Arrival of the signal shouldn't have changed anything.
381        i = self.readpipe_interrupted()
382        self.assertFalse(i)
383
384
385@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
386class ItimerTest(unittest.TestCase):
387    def setUp(self):
388        self.hndl_called = False
389        self.hndl_count = 0
390        self.itimer = None
391        self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
392
393    def tearDown(self):
394        signal.signal(signal.SIGALRM, self.old_alarm)
395        if self.itimer is not None: # test_itimer_exc doesn't change this attr
396            # just ensure that itimer is stopped
397            signal.setitimer(self.itimer, 0)
398
399    def sig_alrm(self, *args):
400        self.hndl_called = True
401        if test_support.verbose:
402            print("SIGALRM handler invoked", args)
403
404    def sig_vtalrm(self, *args):
405        self.hndl_called = True
406
407        if self.hndl_count > 3:
408            # it shouldn't be here, because it should have been disabled.
409            raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
410                "timer.")
411        elif self.hndl_count == 3:
412            # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
413            signal.setitimer(signal.ITIMER_VIRTUAL, 0)
414            if test_support.verbose:
415                print("last SIGVTALRM handler call")
416
417        self.hndl_count += 1
418
419        if test_support.verbose:
420            print("SIGVTALRM handler invoked", args)
421
422    def sig_prof(self, *args):
423        self.hndl_called = True
424        signal.setitimer(signal.ITIMER_PROF, 0)
425
426        if test_support.verbose:
427            print("SIGPROF handler invoked", args)
428
429    def test_itimer_exc(self):
430        # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
431        # defines it ?
432        self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
433        # Negative times are treated as zero on some platforms.
434        if 0:
435            self.assertRaises(signal.ItimerError,
436                              signal.setitimer, signal.ITIMER_REAL, -1)
437
438    def test_itimer_real(self):
439        self.itimer = signal.ITIMER_REAL
440        signal.setitimer(self.itimer, 1.0)
441        if test_support.verbose:
442            print("\ncall pause()...")
443        signal.pause()
444
445        self.assertEqual(self.hndl_called, True)
446
447    # Issue 3864. Unknown if this affects earlier versions of freebsd also.
448    @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'),
449        'itimer not reliable (does not mix well with threading) on some BSDs.')
450    def test_itimer_virtual(self):
451        self.itimer = signal.ITIMER_VIRTUAL
452        signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
453        signal.setitimer(self.itimer, 0.3, 0.2)
454
455        start_time = time.time()
456        while time.time() - start_time < 60.0:
457            # use up some virtual time by doing real work
458            _ = pow(12345, 67890, 10000019)
459            if signal.getitimer(self.itimer) == (0.0, 0.0):
460                break # sig_vtalrm handler stopped this itimer
461        else: # Issue 8424
462            self.skipTest("timeout: likely cause: machine too slow or load too "
463                          "high")
464
465        # virtual itimer should be (0.0, 0.0) now
466        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
467        # and the handler should have been called
468        self.assertEqual(self.hndl_called, True)
469
470    # Issue 3864. Unknown if this affects earlier versions of freebsd also.
471    @unittest.skipIf(sys.platform=='freebsd6',
472        'itimer not reliable (does not mix well with threading) on freebsd6')
473    def test_itimer_prof(self):
474        self.itimer = signal.ITIMER_PROF
475        signal.signal(signal.SIGPROF, self.sig_prof)
476        signal.setitimer(self.itimer, 0.2, 0.2)
477
478        start_time = time.time()
479        while time.time() - start_time < 60.0:
480            # do some work
481            _ = pow(12345, 67890, 10000019)
482            if signal.getitimer(self.itimer) == (0.0, 0.0):
483                break # sig_prof handler stopped this itimer
484        else: # Issue 8424
485            self.skipTest("timeout: likely cause: machine too slow or load too "
486                          "high")
487
488        # profiling itimer should be (0.0, 0.0) now
489        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
490        # and the handler should have been called
491        self.assertEqual(self.hndl_called, True)
492
493def test_main():
494    test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
495                              WakeupFDTests, WakeupSignalTests,
496                              SiginterruptTest, ItimerTest,
497                              WindowsSignalTests)
498
499
500if __name__ == "__main__":
501    test_main()
502