• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Generic thread tests.
2
3Meant to be used by dummy_thread and thread.  To allow for different modules
4to be used, test_main() can be called with the module to use as the thread
5implementation as its sole argument.
6
7"""
8import dummy_thread as _thread
9import time
10import Queue
11import random
12import unittest
13from test import test_support
14
15DELAY = 0 # Set > 0 when testing a module other than dummy_thread, such as
16          # the 'thread' module.
17
18class LockTests(unittest.TestCase):
19    """Test lock objects."""
20
21    def setUp(self):
22        # Create a lock
23        self.lock = _thread.allocate_lock()
24
25    def test_initlock(self):
26        #Make sure locks start locked
27        self.assertFalse(self.lock.locked(),
28                        "Lock object is not initialized unlocked.")
29
30    def test_release(self):
31        # Test self.lock.release()
32        self.lock.acquire()
33        self.lock.release()
34        self.assertFalse(self.lock.locked(),
35                        "Lock object did not release properly.")
36
37    def test_improper_release(self):
38        #Make sure release of an unlocked thread raises _thread.error
39        self.assertRaises(_thread.error, self.lock.release)
40
41    def test_cond_acquire_success(self):
42        #Make sure the conditional acquiring of the lock works.
43        self.assertTrue(self.lock.acquire(0),
44                        "Conditional acquiring of the lock failed.")
45
46    def test_cond_acquire_fail(self):
47        #Test acquiring locked lock returns False
48        self.lock.acquire(0)
49        self.assertFalse(self.lock.acquire(0),
50                        "Conditional acquiring of a locked lock incorrectly "
51                         "succeeded.")
52
53    def test_uncond_acquire_success(self):
54        #Make sure unconditional acquiring of a lock works.
55        self.lock.acquire()
56        self.assertTrue(self.lock.locked(),
57                        "Uncondional locking failed.")
58
59    def test_uncond_acquire_return_val(self):
60        #Make sure that an unconditional locking returns True.
61        self.assertIs(self.lock.acquire(1), True,
62                        "Unconditional locking did not return True.")
63        self.assertIs(self.lock.acquire(), True)
64
65    def test_uncond_acquire_blocking(self):
66        #Make sure that unconditional acquiring of a locked lock blocks.
67        def delay_unlock(to_unlock, delay):
68            """Hold on to lock for a set amount of time before unlocking."""
69            time.sleep(delay)
70            to_unlock.release()
71
72        self.lock.acquire()
73        start_time = int(time.time())
74        _thread.start_new_thread(delay_unlock,(self.lock, DELAY))
75        if test_support.verbose:
76            print
77            print "*** Waiting for thread to release the lock "\
78            "(approx. %s sec.) ***" % DELAY
79        self.lock.acquire()
80        end_time = int(time.time())
81        if test_support.verbose:
82            print "done"
83        self.assertGreaterEqual(end_time - start_time, DELAY,
84                        "Blocking by unconditional acquiring failed.")
85
86class MiscTests(unittest.TestCase):
87    """Miscellaneous tests."""
88
89    def test_exit(self):
90        #Make sure _thread.exit() raises SystemExit
91        self.assertRaises(SystemExit, _thread.exit)
92
93    def test_ident(self):
94        #Test sanity of _thread.get_ident()
95        self.assertIsInstance(_thread.get_ident(), int,
96                              "_thread.get_ident() returned a non-integer")
97        self.assertNotEqual(_thread.get_ident(), 0,
98                        "_thread.get_ident() returned 0")
99
100    def test_LockType(self):
101        #Make sure _thread.LockType is the same type as _thread.allocate_locke()
102        self.assertIsInstance(_thread.allocate_lock(), _thread.LockType,
103                              "_thread.LockType is not an instance of what "
104                              "is returned by _thread.allocate_lock()")
105
106    def test_interrupt_main(self):
107        #Calling start_new_thread with a function that executes interrupt_main
108        # should raise KeyboardInterrupt upon completion.
109        def call_interrupt():
110            _thread.interrupt_main()
111        self.assertRaises(KeyboardInterrupt, _thread.start_new_thread,
112                              call_interrupt, tuple())
113
114    def test_interrupt_in_main(self):
115        # Make sure that if interrupt_main is called in main threat that
116        # KeyboardInterrupt is raised instantly.
117        self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
118
119class ThreadTests(unittest.TestCase):
120    """Test thread creation."""
121
122    def test_arg_passing(self):
123        #Make sure that parameter passing works.
124        def arg_tester(queue, arg1=False, arg2=False):
125            """Use to test _thread.start_new_thread() passes args properly."""
126            queue.put((arg1, arg2))
127
128        testing_queue = Queue.Queue(1)
129        _thread.start_new_thread(arg_tester, (testing_queue, True, True))
130        result = testing_queue.get()
131        self.assertTrue(result[0] and result[1],
132                        "Argument passing for thread creation using tuple failed")
133        _thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue,
134                                                       'arg1':True, 'arg2':True})
135        result = testing_queue.get()
136        self.assertTrue(result[0] and result[1],
137                        "Argument passing for thread creation using kwargs failed")
138        _thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True})
139        result = testing_queue.get()
140        self.assertTrue(result[0] and result[1],
141                        "Argument passing for thread creation using both tuple"
142                        " and kwargs failed")
143
144    def test_multi_creation(self):
145        #Make sure multiple threads can be created.
146        def queue_mark(queue, delay):
147            """Wait for ``delay`` seconds and then put something into ``queue``"""
148            time.sleep(delay)
149            queue.put(_thread.get_ident())
150
151        thread_count = 5
152        testing_queue = Queue.Queue(thread_count)
153        if test_support.verbose:
154            print
155            print "*** Testing multiple thread creation "\
156            "(will take approx. %s to %s sec.) ***" % (DELAY, thread_count)
157        for count in xrange(thread_count):
158            if DELAY:
159                local_delay = round(random.random(), 1)
160            else:
161                local_delay = 0
162            _thread.start_new_thread(queue_mark,
163                                     (testing_queue, local_delay))
164        time.sleep(DELAY)
165        if test_support.verbose:
166            print 'done'
167        self.assertEqual(testing_queue.qsize(), thread_count,
168                        "Not all %s threads executed properly after %s sec." %
169                        (thread_count, DELAY))
170
171def test_main(imported_module=None):
172    global _thread, DELAY
173    if imported_module:
174        _thread = imported_module
175        DELAY = 2
176    if test_support.verbose:
177        print
178        print "*** Using %s as _thread module ***" % _thread
179    test_support.run_unittest(LockTests, MiscTests, ThreadTests)
180
181if __name__ == '__main__':
182    test_main()
183