• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Module which supports allocation of ctypes objects from shared memory
3#
4# multiprocessing/sharedctypes.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# Licensed to PSF under a Contributor Agreement.
8#
9
10import ctypes
11import weakref
12
13from . import heap
14from . import get_context
15
16from .context import reduction, assert_spawning
17_ForkingPickler = reduction.ForkingPickler
18
19__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
20
21#
22#
23#
24
25typecode_to_type = {
26    'c': ctypes.c_char,     'u': ctypes.c_wchar,
27    'b': ctypes.c_byte,     'B': ctypes.c_ubyte,
28    'h': ctypes.c_short,    'H': ctypes.c_ushort,
29    'i': ctypes.c_int,      'I': ctypes.c_uint,
30    'l': ctypes.c_long,     'L': ctypes.c_ulong,
31    'q': ctypes.c_longlong, 'Q': ctypes.c_ulonglong,
32    'f': ctypes.c_float,    'd': ctypes.c_double
33    }
34
35#
36#
37#
38
39def _new_value(type_):
40    size = ctypes.sizeof(type_)
41    wrapper = heap.BufferWrapper(size)
42    return rebuild_ctype(type_, wrapper, None)
43
44def RawValue(typecode_or_type, *args):
45    '''
46    Returns a ctypes object allocated from shared memory
47    '''
48    type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
49    obj = _new_value(type_)
50    ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
51    obj.__init__(*args)
52    return obj
53
54def RawArray(typecode_or_type, size_or_initializer):
55    '''
56    Returns a ctypes array allocated from shared memory
57    '''
58    type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
59    if isinstance(size_or_initializer, int):
60        type_ = type_ * size_or_initializer
61        obj = _new_value(type_)
62        ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
63        return obj
64    else:
65        type_ = type_ * len(size_or_initializer)
66        result = _new_value(type_)
67        result.__init__(*size_or_initializer)
68        return result
69
70def Value(typecode_or_type, *args, lock=True, ctx=None):
71    '''
72    Return a synchronization wrapper for a Value
73    '''
74    obj = RawValue(typecode_or_type, *args)
75    if lock is False:
76        return obj
77    if lock in (True, None):
78        ctx = ctx or get_context()
79        lock = ctx.RLock()
80    if not hasattr(lock, 'acquire'):
81        raise AttributeError("%r has no method 'acquire'" % lock)
82    return synchronized(obj, lock, ctx=ctx)
83
84def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None):
85    '''
86    Return a synchronization wrapper for a RawArray
87    '''
88    obj = RawArray(typecode_or_type, size_or_initializer)
89    if lock is False:
90        return obj
91    if lock in (True, None):
92        ctx = ctx or get_context()
93        lock = ctx.RLock()
94    if not hasattr(lock, 'acquire'):
95        raise AttributeError("%r has no method 'acquire'" % lock)
96    return synchronized(obj, lock, ctx=ctx)
97
98def copy(obj):
99    new_obj = _new_value(type(obj))
100    ctypes.pointer(new_obj)[0] = obj
101    return new_obj
102
103def synchronized(obj, lock=None, ctx=None):
104    assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
105    ctx = ctx or get_context()
106
107    if isinstance(obj, ctypes._SimpleCData):
108        return Synchronized(obj, lock, ctx)
109    elif isinstance(obj, ctypes.Array):
110        if obj._type_ is ctypes.c_char:
111            return SynchronizedString(obj, lock, ctx)
112        return SynchronizedArray(obj, lock, ctx)
113    else:
114        cls = type(obj)
115        try:
116            scls = class_cache[cls]
117        except KeyError:
118            names = [field[0] for field in cls._fields_]
119            d = {name: make_property(name) for name in names}
120            classname = 'Synchronized' + cls.__name__
121            scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
122        return scls(obj, lock, ctx)
123
124#
125# Functions for pickling/unpickling
126#
127
128def reduce_ctype(obj):
129    assert_spawning(obj)
130    if isinstance(obj, ctypes.Array):
131        return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
132    else:
133        return rebuild_ctype, (type(obj), obj._wrapper, None)
134
135def rebuild_ctype(type_, wrapper, length):
136    if length is not None:
137        type_ = type_ * length
138    _ForkingPickler.register(type_, reduce_ctype)
139    buf = wrapper.create_memoryview()
140    obj = type_.from_buffer(buf)
141    obj._wrapper = wrapper
142    return obj
143
144#
145# Function to create properties
146#
147
148def make_property(name):
149    try:
150        return prop_cache[name]
151    except KeyError:
152        d = {}
153        exec(template % ((name,)*7), d)
154        prop_cache[name] = d[name]
155        return d[name]
156
157template = '''
158def get%s(self):
159    self.acquire()
160    try:
161        return self._obj.%s
162    finally:
163        self.release()
164def set%s(self, value):
165    self.acquire()
166    try:
167        self._obj.%s = value
168    finally:
169        self.release()
170%s = property(get%s, set%s)
171'''
172
173prop_cache = {}
174class_cache = weakref.WeakKeyDictionary()
175
176#
177# Synchronized wrappers
178#
179
180class SynchronizedBase(object):
181
182    def __init__(self, obj, lock=None, ctx=None):
183        self._obj = obj
184        if lock:
185            self._lock = lock
186        else:
187            ctx = ctx or get_context(force=True)
188            self._lock = ctx.RLock()
189        self.acquire = self._lock.acquire
190        self.release = self._lock.release
191
192    def __enter__(self):
193        return self._lock.__enter__()
194
195    def __exit__(self, *args):
196        return self._lock.__exit__(*args)
197
198    def __reduce__(self):
199        assert_spawning(self)
200        return synchronized, (self._obj, self._lock)
201
202    def get_obj(self):
203        return self._obj
204
205    def get_lock(self):
206        return self._lock
207
208    def __repr__(self):
209        return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
210
211
212class Synchronized(SynchronizedBase):
213    value = make_property('value')
214
215
216class SynchronizedArray(SynchronizedBase):
217
218    def __len__(self):
219        return len(self._obj)
220
221    def __getitem__(self, i):
222        with self:
223            return self._obj[i]
224
225    def __setitem__(self, i, value):
226        with self:
227            self._obj[i] = value
228
229    def __getslice__(self, start, stop):
230        with self:
231            return self._obj[start:stop]
232
233    def __setslice__(self, start, stop, values):
234        with self:
235            self._obj[start:stop] = values
236
237
238class SynchronizedString(SynchronizedArray):
239    value = make_property('value')
240    raw = make_property('raw')
241