1 #
2 # Module implementing synchronization primitives
3 #
4 # multiprocessing/synchronize.py
5 #
6 # Copyright (c) 2006-2008, R Oudkerk
7 # Licensed to PSF under a Contributor Agreement.
8 #
9
10 __all__ = [
11 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
12 ]
13
14 import threading
15 import sys
16 import tempfile
17 import _multiprocessing
18 import time
19
20 from . import context
21 from . import process
22 from . import util
23
24 # Try to import the mp.synchronize module cleanly, if it fails
25 # raise ImportError for platforms lacking a working sem_open implementation.
26 # See issue 3770
27 try:
28 from _multiprocessing import SemLock, sem_unlink
29 except (ImportError):
30 raise ImportError("This platform lacks a functioning sem_open" +
31 " implementation, therefore, the required" +
32 " synchronization primitives needed will not" +
33 " function, see issue 3770.")
34
35 #
36 # Constants
37 #
38
39 RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
40 SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
41
42 #
43 # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
44 #
45
46 class ESC[4;38;5;81mSemLock(ESC[4;38;5;149mobject):
47
48 _rand = tempfile._RandomNameSequence()
49
50 def __init__(self, kind, value, maxvalue, *, ctx):
51 if ctx is None:
52 ctx = context._default_context.get_context()
53 self._is_fork_ctx = ctx.get_start_method() == 'fork'
54 unlink_now = sys.platform == 'win32' or self._is_fork_ctx
55 for i in range(100):
56 try:
57 sl = self._semlock = _multiprocessing.SemLock(
58 kind, value, maxvalue, self._make_name(),
59 unlink_now)
60 except FileExistsError:
61 pass
62 else:
63 break
64 else:
65 raise FileExistsError('cannot find name for semaphore')
66
67 util.debug('created semlock with handle %s' % sl.handle)
68 self._make_methods()
69
70 if sys.platform != 'win32':
71 def _after_fork(obj):
72 obj._semlock._after_fork()
73 util.register_after_fork(self, _after_fork)
74
75 if self._semlock.name is not None:
76 # We only get here if we are on Unix with forking
77 # disabled. When the object is garbage collected or the
78 # process shuts down we unlink the semaphore name
79 from .resource_tracker import register
80 register(self._semlock.name, "semaphore")
81 util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
82 exitpriority=0)
83
84 @staticmethod
85 def _cleanup(name):
86 from .resource_tracker import unregister
87 sem_unlink(name)
88 unregister(name, "semaphore")
89
90 def _make_methods(self):
91 self.acquire = self._semlock.acquire
92 self.release = self._semlock.release
93
94 def __enter__(self):
95 return self._semlock.__enter__()
96
97 def __exit__(self, *args):
98 return self._semlock.__exit__(*args)
99
100 def __getstate__(self):
101 context.assert_spawning(self)
102 sl = self._semlock
103 if sys.platform == 'win32':
104 h = context.get_spawning_popen().duplicate_for_child(sl.handle)
105 else:
106 if self._is_fork_ctx:
107 raise RuntimeError('A SemLock created in a fork context is being '
108 'shared with a process in a spawn context. This is '
109 'not supported. Please use the same context to create '
110 'multiprocessing objects and Process.')
111 h = sl.handle
112 return (h, sl.kind, sl.maxvalue, sl.name)
113
114 def __setstate__(self, state):
115 self._semlock = _multiprocessing.SemLock._rebuild(*state)
116 util.debug('recreated blocker with handle %r' % state[0])
117 self._make_methods()
118 # Ensure that deserialized SemLock can be serialized again (gh-108520).
119 self._is_fork_ctx = False
120
121 @staticmethod
122 def _make_name():
123 return '%s-%s' % (process.current_process()._config['semprefix'],
124 next(SemLock._rand))
125
126 #
127 # Semaphore
128 #
129
130 class ESC[4;38;5;81mSemaphore(ESC[4;38;5;149mSemLock):
131
132 def __init__(self, value=1, *, ctx):
133 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx)
134
135 def get_value(self):
136 return self._semlock._get_value()
137
138 def __repr__(self):
139 try:
140 value = self._semlock._get_value()
141 except Exception:
142 value = 'unknown'
143 return '<%s(value=%s)>' % (self.__class__.__name__, value)
144
145 #
146 # Bounded semaphore
147 #
148
149 class ESC[4;38;5;81mBoundedSemaphore(ESC[4;38;5;149mSemaphore):
150
151 def __init__(self, value=1, *, ctx):
152 SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx)
153
154 def __repr__(self):
155 try:
156 value = self._semlock._get_value()
157 except Exception:
158 value = 'unknown'
159 return '<%s(value=%s, maxvalue=%s)>' % \
160 (self.__class__.__name__, value, self._semlock.maxvalue)
161
162 #
163 # Non-recursive lock
164 #
165
166 class ESC[4;38;5;81mLock(ESC[4;38;5;149mSemLock):
167
168 def __init__(self, *, ctx):
169 SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
170
171 def __repr__(self):
172 try:
173 if self._semlock._is_mine():
174 name = process.current_process().name
175 if threading.current_thread().name != 'MainThread':
176 name += '|' + threading.current_thread().name
177 elif self._semlock._get_value() == 1:
178 name = 'None'
179 elif self._semlock._count() > 0:
180 name = 'SomeOtherThread'
181 else:
182 name = 'SomeOtherProcess'
183 except Exception:
184 name = 'unknown'
185 return '<%s(owner=%s)>' % (self.__class__.__name__, name)
186
187 #
188 # Recursive lock
189 #
190
191 class ESC[4;38;5;81mRLock(ESC[4;38;5;149mSemLock):
192
193 def __init__(self, *, ctx):
194 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx)
195
196 def __repr__(self):
197 try:
198 if self._semlock._is_mine():
199 name = process.current_process().name
200 if threading.current_thread().name != 'MainThread':
201 name += '|' + threading.current_thread().name
202 count = self._semlock._count()
203 elif self._semlock._get_value() == 1:
204 name, count = 'None', 0
205 elif self._semlock._count() > 0:
206 name, count = 'SomeOtherThread', 'nonzero'
207 else:
208 name, count = 'SomeOtherProcess', 'nonzero'
209 except Exception:
210 name, count = 'unknown', 'unknown'
211 return '<%s(%s, %s)>' % (self.__class__.__name__, name, count)
212
213 #
214 # Condition variable
215 #
216
217 class ESC[4;38;5;81mCondition(ESC[4;38;5;149mobject):
218
219 def __init__(self, lock=None, *, ctx):
220 self._lock = lock or ctx.RLock()
221 self._sleeping_count = ctx.Semaphore(0)
222 self._woken_count = ctx.Semaphore(0)
223 self._wait_semaphore = ctx.Semaphore(0)
224 self._make_methods()
225
226 def __getstate__(self):
227 context.assert_spawning(self)
228 return (self._lock, self._sleeping_count,
229 self._woken_count, self._wait_semaphore)
230
231 def __setstate__(self, state):
232 (self._lock, self._sleeping_count,
233 self._woken_count, self._wait_semaphore) = state
234 self._make_methods()
235
236 def __enter__(self):
237 return self._lock.__enter__()
238
239 def __exit__(self, *args):
240 return self._lock.__exit__(*args)
241
242 def _make_methods(self):
243 self.acquire = self._lock.acquire
244 self.release = self._lock.release
245
246 def __repr__(self):
247 try:
248 num_waiters = (self._sleeping_count._semlock._get_value() -
249 self._woken_count._semlock._get_value())
250 except Exception:
251 num_waiters = 'unknown'
252 return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters)
253
254 def wait(self, timeout=None):
255 assert self._lock._semlock._is_mine(), \
256 'must acquire() condition before using wait()'
257
258 # indicate that this thread is going to sleep
259 self._sleeping_count.release()
260
261 # release lock
262 count = self._lock._semlock._count()
263 for i in range(count):
264 self._lock.release()
265
266 try:
267 # wait for notification or timeout
268 return self._wait_semaphore.acquire(True, timeout)
269 finally:
270 # indicate that this thread has woken
271 self._woken_count.release()
272
273 # reacquire lock
274 for i in range(count):
275 self._lock.acquire()
276
277 def notify(self, n=1):
278 assert self._lock._semlock._is_mine(), 'lock is not owned'
279 assert not self._wait_semaphore.acquire(
280 False), ('notify: Should not have been able to acquire '
281 + '_wait_semaphore')
282
283 # to take account of timeouts since last notify*() we subtract
284 # woken_count from sleeping_count and rezero woken_count
285 while self._woken_count.acquire(False):
286 res = self._sleeping_count.acquire(False)
287 assert res, ('notify: Bug in sleeping_count.acquire'
288 + '- res should not be False')
289
290 sleepers = 0
291 while sleepers < n and self._sleeping_count.acquire(False):
292 self._wait_semaphore.release() # wake up one sleeper
293 sleepers += 1
294
295 if sleepers:
296 for i in range(sleepers):
297 self._woken_count.acquire() # wait for a sleeper to wake
298
299 # rezero wait_semaphore in case some timeouts just happened
300 while self._wait_semaphore.acquire(False):
301 pass
302
303 def notify_all(self):
304 self.notify(n=sys.maxsize)
305
306 def wait_for(self, predicate, timeout=None):
307 result = predicate()
308 if result:
309 return result
310 if timeout is not None:
311 endtime = time.monotonic() + timeout
312 else:
313 endtime = None
314 waittime = None
315 while not result:
316 if endtime is not None:
317 waittime = endtime - time.monotonic()
318 if waittime <= 0:
319 break
320 self.wait(waittime)
321 result = predicate()
322 return result
323
324 #
325 # Event
326 #
327
328 class ESC[4;38;5;81mEvent(ESC[4;38;5;149mobject):
329
330 def __init__(self, *, ctx):
331 self._cond = ctx.Condition(ctx.Lock())
332 self._flag = ctx.Semaphore(0)
333
334 def is_set(self):
335 with self._cond:
336 if self._flag.acquire(False):
337 self._flag.release()
338 return True
339 return False
340
341 def set(self):
342 with self._cond:
343 self._flag.acquire(False)
344 self._flag.release()
345 self._cond.notify_all()
346
347 def clear(self):
348 with self._cond:
349 self._flag.acquire(False)
350
351 def wait(self, timeout=None):
352 with self._cond:
353 if self._flag.acquire(False):
354 self._flag.release()
355 else:
356 self._cond.wait(timeout)
357
358 if self._flag.acquire(False):
359 self._flag.release()
360 return True
361 return False
362
363 def __repr__(self) -> str:
364 set_status = 'set' if self.is_set() else 'unset'
365 return f"<{type(self).__qualname__} at {id(self):#x} {set_status}>"
366 #
367 # Barrier
368 #
369
370 class ESC[4;38;5;81mBarrier(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mBarrier):
371
372 def __init__(self, parties, action=None, timeout=None, *, ctx):
373 import struct
374 from .heap import BufferWrapper
375 wrapper = BufferWrapper(struct.calcsize('i') * 2)
376 cond = ctx.Condition()
377 self.__setstate__((parties, action, timeout, cond, wrapper))
378 self._state = 0
379 self._count = 0
380
381 def __setstate__(self, state):
382 (self._parties, self._action, self._timeout,
383 self._cond, self._wrapper) = state
384 self._array = self._wrapper.create_memoryview().cast('i')
385
386 def __getstate__(self):
387 return (self._parties, self._action, self._timeout,
388 self._cond, self._wrapper)
389
390 @property
391 def _state(self):
392 return self._array[0]
393
394 @_state.setter
395 def _state(self, value):
396 self._array[0] = value
397
398 @property
399 def _count(self):
400 return self._array[1]
401
402 @_count.setter
403 def _count(self, value):
404 self._array[1] = value