• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Run the _testcapi module tests (tests for the Python/C API):  by defn,
2# these are all functions _testcapi exports whose name begins with 'test_'.
3
4from __future__ import with_statement
5import sys
6import time
7import random
8import unittest
9from test import test_support
10try:
11    import threading
12except ImportError:
13    threading = None
14import _testcapi
15
16@unittest.skipUnless(threading, 'Threading required for this test.')
17class TestPendingCalls(unittest.TestCase):
18
19    def pendingcalls_submit(self, l, n):
20        def callback():
21            #this function can be interrupted by thread switching so let's
22            #use an atomic operation
23            l.append(None)
24
25        for i in range(n):
26            time.sleep(random.random()*0.02) #0.01 secs on average
27            #try submitting callback until successful.
28            #rely on regular interrupt to flush queue if we are
29            #unsuccessful.
30            while True:
31                if _testcapi._pending_threadfunc(callback):
32                    break;
33
34    def pendingcalls_wait(self, l, n, context = None):
35        #now, stick around until l[0] has grown to 10
36        count = 0;
37        while len(l) != n:
38            #this busy loop is where we expect to be interrupted to
39            #run our callbacks.  Note that callbacks are only run on the
40            #main thread
41            if False and test_support.verbose:
42                print "(%i)"%(len(l),),
43            for i in xrange(1000):
44                a = i*i
45            if context and not context.event.is_set():
46                continue
47            count += 1
48            self.assertTrue(count < 10000,
49                "timeout waiting for %i callbacks, got %i"%(n, len(l)))
50        if False and test_support.verbose:
51            print "(%i)"%(len(l),)
52
53    def test_pendingcalls_threaded(self):
54        #do every callback on a separate thread
55        n = 32 #total callbacks
56        threads = []
57        class foo(object):pass
58        context = foo()
59        context.l = []
60        context.n = 2 #submits per thread
61        context.nThreads = n // context.n
62        context.nFinished = 0
63        context.lock = threading.Lock()
64        context.event = threading.Event()
65
66        for i in range(context.nThreads):
67            t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
68            t.start()
69            threads.append(t)
70
71        self.pendingcalls_wait(context.l, n, context)
72
73        for t in threads:
74            t.join()
75
76    def pendingcalls_thread(self, context):
77        try:
78            self.pendingcalls_submit(context.l, context.n)
79        finally:
80            with context.lock:
81                context.nFinished += 1
82                nFinished = context.nFinished
83                if False and test_support.verbose:
84                    print "finished threads: ", nFinished
85            if nFinished == context.nThreads:
86                context.event.set()
87
88    def test_pendingcalls_non_threaded(self):
89        #again, just using the main thread, likely they will all be dispatched at
90        #once.  It is ok to ask for too many, because we loop until we find a slot.
91        #the loop can be interrupted to dispatch.
92        #there are only 32 dispatch slots, so we go for twice that!
93        l = []
94        n = 64
95        self.pendingcalls_submit(l, n)
96        self.pendingcalls_wait(l, n)
97
98
99def test_main():
100
101    for name in dir(_testcapi):
102        if name.startswith('test_'):
103            test = getattr(_testcapi, name)
104            if test_support.verbose:
105                print "internal", name
106            try:
107                test()
108            except _testcapi.error:
109                raise test_support.TestFailed, sys.exc_info()[1]
110
111    # some extra thread-state tests driven via _testcapi
112    def TestThreadState():
113        if test_support.verbose:
114            print "auto-thread-state"
115
116        idents = []
117
118        def callback():
119            idents.append(thread.get_ident())
120
121        _testcapi._test_thread_state(callback)
122        a = b = callback
123        time.sleep(1)
124        # Check our main thread is in the list exactly 3 times.
125        if idents.count(thread.get_ident()) != 3:
126            raise test_support.TestFailed, \
127                  "Couldn't find main thread correctly in the list"
128
129    if threading:
130        import thread
131        import time
132        TestThreadState()
133        t=threading.Thread(target=TestThreadState)
134        t.start()
135        t.join()
136
137    test_support.run_unittest(TestPendingCalls)
138
139if __name__ == "__main__":
140    test_main()
141