1 #
2 # Module which supports allocation of ctypes objects from shared memory
3 #
4 # multiprocessing/sharedctypes.py
5 #
6 # Copyright (c) 2006-2008, R Oudkerk
7 # Licensed to PSF under a Contributor Agreement.
8 #
9
10 import ctypes
11 import weakref
12
13 from . import heap
14 from . import get_context
15
16 from .context import reduction, assert_spawning
17 _ForkingPickler = reduction.ForkingPickler
18
19 __all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
20
21 #
22 #
23 #
24
25 typecode_to_type = {
26 'c': ctypes.c_char, 'u': ctypes.c_wchar,
27 'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
28 'h': ctypes.c_short, 'H': ctypes.c_ushort,
29 'i': ctypes.c_int, 'I': ctypes.c_uint,
30 'l': ctypes.c_long, 'L': ctypes.c_ulong,
31 'q': ctypes.c_longlong, 'Q': ctypes.c_ulonglong,
32 'f': ctypes.c_float, 'd': ctypes.c_double
33 }
34
35 #
36 #
37 #
38
39 def _new_value(type_):
40 size = ctypes.sizeof(type_)
41 wrapper = heap.BufferWrapper(size)
42 return rebuild_ctype(type_, wrapper, None)
43
44 def RawValue(typecode_or_type, *args):
45 '''
46 Returns a ctypes object allocated from shared memory
47 '''
48 type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
49 obj = _new_value(type_)
50 ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
51 obj.__init__(*args)
52 return obj
53
54 def RawArray(typecode_or_type, size_or_initializer):
55 '''
56 Returns a ctypes array allocated from shared memory
57 '''
58 type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
59 if isinstance(size_or_initializer, int):
60 type_ = type_ * size_or_initializer
61 obj = _new_value(type_)
62 ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
63 return obj
64 else:
65 type_ = type_ * len(size_or_initializer)
66 result = _new_value(type_)
67 result.__init__(*size_or_initializer)
68 return result
69
70 def Value(typecode_or_type, *args, lock=True, ctx=None):
71 '''
72 Return a synchronization wrapper for a Value
73 '''
74 obj = RawValue(typecode_or_type, *args)
75 if lock is False:
76 return obj
77 if lock in (True, None):
78 ctx = ctx or get_context()
79 lock = ctx.RLock()
80 if not hasattr(lock, 'acquire'):
81 raise AttributeError("%r has no method 'acquire'" % lock)
82 return synchronized(obj, lock, ctx=ctx)
83
84 def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None):
85 '''
86 Return a synchronization wrapper for a RawArray
87 '''
88 obj = RawArray(typecode_or_type, size_or_initializer)
89 if lock is False:
90 return obj
91 if lock in (True, None):
92 ctx = ctx or get_context()
93 lock = ctx.RLock()
94 if not hasattr(lock, 'acquire'):
95 raise AttributeError("%r has no method 'acquire'" % lock)
96 return synchronized(obj, lock, ctx=ctx)
97
98 def copy(obj):
99 new_obj = _new_value(type(obj))
100 ctypes.pointer(new_obj)[0] = obj
101 return new_obj
102
103 def synchronized(obj, lock=None, ctx=None):
104 assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
105 ctx = ctx or get_context()
106
107 if isinstance(obj, ctypes._SimpleCData):
108 return Synchronized(obj, lock, ctx)
109 elif isinstance(obj, ctypes.Array):
110 if obj._type_ is ctypes.c_char:
111 return SynchronizedString(obj, lock, ctx)
112 return SynchronizedArray(obj, lock, ctx)
113 else:
114 cls = type(obj)
115 try:
116 scls = class_cache[cls]
117 except KeyError:
118 names = [field[0] for field in cls._fields_]
119 d = {name: make_property(name) for name in names}
120 classname = 'Synchronized' + cls.__name__
121 scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
122 return scls(obj, lock, ctx)
123
124 #
125 # Functions for pickling/unpickling
126 #
127
128 def reduce_ctype(obj):
129 assert_spawning(obj)
130 if isinstance(obj, ctypes.Array):
131 return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
132 else:
133 return rebuild_ctype, (type(obj), obj._wrapper, None)
134
135 def rebuild_ctype(type_, wrapper, length):
136 if length is not None:
137 type_ = type_ * length
138 _ForkingPickler.register(type_, reduce_ctype)
139 buf = wrapper.create_memoryview()
140 obj = type_.from_buffer(buf)
141 obj._wrapper = wrapper
142 return obj
143
144 #
145 # Function to create properties
146 #
147
148 def make_property(name):
149 try:
150 return prop_cache[name]
151 except KeyError:
152 d = {}
153 exec(template % ((name,)*7), d)
154 prop_cache[name] = d[name]
155 return d[name]
156
157 template = '''
158 def get%s(self):
159 self.acquire()
160 try:
161 return self._obj.%s
162 finally:
163 self.release()
164 def set%s(self, value):
165 self.acquire()
166 try:
167 self._obj.%s = value
168 finally:
169 self.release()
170 %s = property(get%s, set%s)
171 '''
172
173 prop_cache = {}
174 class_cache = weakref.WeakKeyDictionary()
175
176 #
177 # Synchronized wrappers
178 #
179
180 class ESC[4;38;5;81mSynchronizedBase(ESC[4;38;5;149mobject):
181
182 def __init__(self, obj, lock=None, ctx=None):
183 self._obj = obj
184 if lock:
185 self._lock = lock
186 else:
187 ctx = ctx or get_context(force=True)
188 self._lock = ctx.RLock()
189 self.acquire = self._lock.acquire
190 self.release = self._lock.release
191
192 def __enter__(self):
193 return self._lock.__enter__()
194
195 def __exit__(self, *args):
196 return self._lock.__exit__(*args)
197
198 def __reduce__(self):
199 assert_spawning(self)
200 return synchronized, (self._obj, self._lock)
201
202 def get_obj(self):
203 return self._obj
204
205 def get_lock(self):
206 return self._lock
207
208 def __repr__(self):
209 return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
210
211
212 class ESC[4;38;5;81mSynchronized(ESC[4;38;5;149mSynchronizedBase):
213 value = make_property('value')
214
215
216 class ESC[4;38;5;81mSynchronizedArray(ESC[4;38;5;149mSynchronizedBase):
217
218 def __len__(self):
219 return len(self._obj)
220
221 def __getitem__(self, i):
222 with self:
223 return self._obj[i]
224
225 def __setitem__(self, i, value):
226 with self:
227 self._obj[i] = value
228
229 def __getslice__(self, start, stop):
230 with self:
231 return self._obj[start:stop]
232
233 def __setslice__(self, start, stop, values):
234 with self:
235 self._obj[start:stop] = values
236
237
238 class ESC[4;38;5;81mSynchronizedString(ESC[4;38;5;149mSynchronizedArray):
239 value = make_property('value')
240 raw = make_property('raw')