• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Drop-in replacement for the thread module.
2
3Meant to be used as a brain-dead substitute so that threaded code does
4not need to be rewritten for when the thread module is not present.
5
6Suggested usage is::
7
8    try:
9        import thread
10    except ImportError:
11        import dummy_thread as thread
12
13"""
14# Exports only things specified by thread documentation;
15# skipping obsolete synonyms allocate(), start_new(), exit_thread().
16__all__ = ['error', 'start_new_thread', 'exit', 'get_ident', 'allocate_lock',
17           'interrupt_main', 'LockType']
18
19import traceback as _traceback
20
21class error(Exception):
22    """Dummy implementation of thread.error."""
23
24    def __init__(self, *args):
25        self.args = args
26
27def start_new_thread(function, args, kwargs={}):
28    """Dummy implementation of thread.start_new_thread().
29
30    Compatibility is maintained by making sure that ``args`` is a
31    tuple and ``kwargs`` is a dictionary.  If an exception is raised
32    and it is SystemExit (which can be done by thread.exit()) it is
33    caught and nothing is done; all other exceptions are printed out
34    by using traceback.print_exc().
35
36    If the executed function calls interrupt_main the KeyboardInterrupt will be
37    raised when the function returns.
38
39    """
40    if type(args) != type(tuple()):
41        raise TypeError("2nd arg must be a tuple")
42    if type(kwargs) != type(dict()):
43        raise TypeError("3rd arg must be a dict")
44    global _main
45    _main = False
46    try:
47        function(*args, **kwargs)
48    except SystemExit:
49        pass
50    except:
51        _traceback.print_exc()
52    _main = True
53    global _interrupt
54    if _interrupt:
55        _interrupt = False
56        raise KeyboardInterrupt
57
58def exit():
59    """Dummy implementation of thread.exit()."""
60    raise SystemExit
61
62def get_ident():
63    """Dummy implementation of thread.get_ident().
64
65    Since this module should only be used when threadmodule is not
66    available, it is safe to assume that the current process is the
67    only thread.  Thus a constant can be safely returned.
68    """
69    return -1
70
71def allocate_lock():
72    """Dummy implementation of thread.allocate_lock()."""
73    return LockType()
74
75def stack_size(size=None):
76    """Dummy implementation of thread.stack_size()."""
77    if size is not None:
78        raise error("setting thread stack size not supported")
79    return 0
80
81class LockType(object):
82    """Class implementing dummy implementation of thread.LockType.
83
84    Compatibility is maintained by maintaining self.locked_status
85    which is a boolean that stores the state of the lock.  Pickling of
86    the lock, though, should not be done since if the thread module is
87    then used with an unpickled ``lock()`` from here problems could
88    occur from this class not having atomic methods.
89
90    """
91
92    def __init__(self):
93        self.locked_status = False
94
95    def acquire(self, waitflag=None):
96        """Dummy implementation of acquire().
97
98        For blocking calls, self.locked_status is automatically set to
99        True and returned appropriately based on value of
100        ``waitflag``.  If it is non-blocking, then the value is
101        actually checked and not set if it is already acquired.  This
102        is all done so that threading.Condition's assert statements
103        aren't triggered and throw a little fit.
104
105        """
106        if waitflag is None or waitflag:
107            self.locked_status = True
108            return True
109        else:
110            if not self.locked_status:
111                self.locked_status = True
112                return True
113            else:
114                return False
115
116    __enter__ = acquire
117
118    def __exit__(self, typ, val, tb):
119        self.release()
120
121    def release(self):
122        """Release the dummy lock."""
123        # XXX Perhaps shouldn't actually bother to test?  Could lead
124        #     to problems for complex, threaded code.
125        if not self.locked_status:
126            raise error
127        self.locked_status = False
128        return True
129
130    def locked(self):
131        return self.locked_status
132
133# Used to signal that interrupt_main was called in a "thread"
134_interrupt = False
135# True when not executing in a "thread"
136_main = True
137
138def interrupt_main():
139    """Set _interrupt flag to True to have start_new_thread raise
140    KeyboardInterrupt upon exiting."""
141    if _main:
142        raise KeyboardInterrupt
143    else:
144        global _interrupt
145        _interrupt = True
146