• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Module which supports allocation of ctypes objects from shared memory
3#
4# multiprocessing/sharedctypes.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14#    notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16#    notice, this list of conditions and the following disclaimer in the
17#    documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19#    used to endorse or promote products derived from this software
20#    without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
33#
34
35import sys
36import ctypes
37import weakref
38
39from multiprocessing import heap, RLock
40from multiprocessing.forking import assert_spawning, ForkingPickler
41
42__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
43
44#
45#
46#
47
48typecode_to_type = {
49    'c': ctypes.c_char,
50    'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
51    'h': ctypes.c_short, 'H': ctypes.c_ushort,
52    'i': ctypes.c_int,   'I': ctypes.c_uint,
53    'l': ctypes.c_long,  'L': ctypes.c_ulong,
54    'f': ctypes.c_float, 'd': ctypes.c_double
55    }
56try:
57    typecode_to_type['u'] = ctypes.c_wchar
58except AttributeError:
59    pass
60
61
62#
63#
64#
65
66def _new_value(type_):
67    size = ctypes.sizeof(type_)
68    wrapper = heap.BufferWrapper(size)
69    return rebuild_ctype(type_, wrapper, None)
70
71def RawValue(typecode_or_type, *args):
72    '''
73    Returns a ctypes object allocated from shared memory
74    '''
75    type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
76    obj = _new_value(type_)
77    ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
78    obj.__init__(*args)
79    return obj
80
81def RawArray(typecode_or_type, size_or_initializer):
82    '''
83    Returns a ctypes array allocated from shared memory
84    '''
85    type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
86    if isinstance(size_or_initializer, (int, long)):
87        type_ = type_ * size_or_initializer
88        obj = _new_value(type_)
89        ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
90        return obj
91    else:
92        type_ = type_ * len(size_or_initializer)
93        result = _new_value(type_)
94        result.__init__(*size_or_initializer)
95        return result
96
97def Value(typecode_or_type, *args, **kwds):
98    '''
99    Return a synchronization wrapper for a Value
100    '''
101    lock = kwds.pop('lock', None)
102    if kwds:
103        raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
104    obj = RawValue(typecode_or_type, *args)
105    if lock is False:
106        return obj
107    if lock in (True, None):
108        lock = RLock()
109    if not hasattr(lock, 'acquire'):
110        raise AttributeError("'%r' has no method 'acquire'" % lock)
111    return synchronized(obj, lock)
112
113def Array(typecode_or_type, size_or_initializer, **kwds):
114    '''
115    Return a synchronization wrapper for a RawArray
116    '''
117    lock = kwds.pop('lock', None)
118    if kwds:
119        raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
120    obj = RawArray(typecode_or_type, size_or_initializer)
121    if lock is False:
122        return obj
123    if lock in (True, None):
124        lock = RLock()
125    if not hasattr(lock, 'acquire'):
126        raise AttributeError("'%r' has no method 'acquire'" % lock)
127    return synchronized(obj, lock)
128
129def copy(obj):
130    new_obj = _new_value(type(obj))
131    ctypes.pointer(new_obj)[0] = obj
132    return new_obj
133
134def synchronized(obj, lock=None):
135    assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
136
137    if isinstance(obj, ctypes._SimpleCData):
138        return Synchronized(obj, lock)
139    elif isinstance(obj, ctypes.Array):
140        if obj._type_ is ctypes.c_char:
141            return SynchronizedString(obj, lock)
142        return SynchronizedArray(obj, lock)
143    else:
144        cls = type(obj)
145        try:
146            scls = class_cache[cls]
147        except KeyError:
148            names = [field[0] for field in cls._fields_]
149            d = dict((name, make_property(name)) for name in names)
150            classname = 'Synchronized' + cls.__name__
151            scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
152        return scls(obj, lock)
153
154#
155# Functions for pickling/unpickling
156#
157
158def reduce_ctype(obj):
159    assert_spawning(obj)
160    if isinstance(obj, ctypes.Array):
161        return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
162    else:
163        return rebuild_ctype, (type(obj), obj._wrapper, None)
164
165def rebuild_ctype(type_, wrapper, length):
166    if length is not None:
167        type_ = type_ * length
168    ForkingPickler.register(type_, reduce_ctype)
169    obj = type_.from_address(wrapper.get_address())
170    obj._wrapper = wrapper
171    return obj
172
173#
174# Function to create properties
175#
176
177def make_property(name):
178    try:
179        return prop_cache[name]
180    except KeyError:
181        d = {}
182        exec template % ((name,)*7) in d
183        prop_cache[name] = d[name]
184        return d[name]
185
186template = '''
187def get%s(self):
188    self.acquire()
189    try:
190        return self._obj.%s
191    finally:
192        self.release()
193def set%s(self, value):
194    self.acquire()
195    try:
196        self._obj.%s = value
197    finally:
198        self.release()
199%s = property(get%s, set%s)
200'''
201
202prop_cache = {}
203class_cache = weakref.WeakKeyDictionary()
204
205#
206# Synchronized wrappers
207#
208
209class SynchronizedBase(object):
210
211    def __init__(self, obj, lock=None):
212        self._obj = obj
213        self._lock = lock or RLock()
214        self.acquire = self._lock.acquire
215        self.release = self._lock.release
216
217    def __reduce__(self):
218        assert_spawning(self)
219        return synchronized, (self._obj, self._lock)
220
221    def get_obj(self):
222        return self._obj
223
224    def get_lock(self):
225        return self._lock
226
227    def __repr__(self):
228        return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
229
230
231class Synchronized(SynchronizedBase):
232    value = make_property('value')
233
234
235class SynchronizedArray(SynchronizedBase):
236
237    def __len__(self):
238        return len(self._obj)
239
240    def __getitem__(self, i):
241        self.acquire()
242        try:
243            return self._obj[i]
244        finally:
245            self.release()
246
247    def __setitem__(self, i, value):
248        self.acquire()
249        try:
250            self._obj[i] = value
251        finally:
252            self.release()
253
254    def __getslice__(self, start, stop):
255        self.acquire()
256        try:
257            return self._obj[start:stop]
258        finally:
259            self.release()
260
261    def __setslice__(self, start, stop, values):
262        self.acquire()
263        try:
264            self._obj[start:stop] = values
265        finally:
266            self.release()
267
268
269class SynchronizedString(SynchronizedArray):
270    value = make_property('value')
271    raw = make_property('raw')
272