1 #
2 # Module providing manager classes for dealing
3 # with shared objects
4 #
5 # multiprocessing/managers.py
6 #
7 # Copyright (c) 2006-2008, R Oudkerk
8 # Licensed to PSF under a Contributor Agreement.
9 #
10
11 __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13 #
14 # Imports
15 #
16
17 import sys
18 import threading
19 import signal
20 import array
21 import queue
22 import time
23 import types
24 import os
25 from os import getpid
26
27 from traceback import format_exc
28
29 from . import connection
30 from .context import reduction, get_spawning_popen, ProcessError
31 from . import pool
32 from . import process
33 from . import util
34 from . import get_context
35 try:
36 from . import shared_memory
37 except ImportError:
38 HAS_SHMEM = False
39 else:
40 HAS_SHMEM = True
41 __all__.append('SharedMemoryManager')
42
43 #
44 # Register some things for pickling
45 #
46
47 def reduce_array(a):
48 return array.array, (a.typecode, a.tobytes())
49 reduction.register(array.array, reduce_array)
50
51 view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
52 def rebuild_as_list(obj):
53 return list, (list(obj),)
54 for view_type in view_types:
55 reduction.register(view_type, rebuild_as_list)
56 del view_type, view_types
57
58 #
59 # Type for identifying shared objects
60 #
61
62 class ESC[4;38;5;81mToken(ESC[4;38;5;149mobject):
63 '''
64 Type to uniquely identify a shared object
65 '''
66 __slots__ = ('typeid', 'address', 'id')
67
68 def __init__(self, typeid, address, id):
69 (self.typeid, self.address, self.id) = (typeid, address, id)
70
71 def __getstate__(self):
72 return (self.typeid, self.address, self.id)
73
74 def __setstate__(self, state):
75 (self.typeid, self.address, self.id) = state
76
77 def __repr__(self):
78 return '%s(typeid=%r, address=%r, id=%r)' % \
79 (self.__class__.__name__, self.typeid, self.address, self.id)
80
81 #
82 # Function for communication with a manager's server process
83 #
84
85 def dispatch(c, id, methodname, args=(), kwds={}):
86 '''
87 Send a message to manager using connection `c` and return response
88 '''
89 c.send((id, methodname, args, kwds))
90 kind, result = c.recv()
91 if kind == '#RETURN':
92 return result
93 raise convert_to_error(kind, result)
94
95 def convert_to_error(kind, result):
96 if kind == '#ERROR':
97 return result
98 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
99 if not isinstance(result, str):
100 raise TypeError(
101 "Result {0!r} (kind '{1}') type is {2}, not str".format(
102 result, kind, type(result)))
103 if kind == '#UNSERIALIZABLE':
104 return RemoteError('Unserializable message: %s\n' % result)
105 else:
106 return RemoteError(result)
107 else:
108 return ValueError('Unrecognized message type {!r}'.format(kind))
109
110 class ESC[4;38;5;81mRemoteError(ESC[4;38;5;149mException):
111 def __str__(self):
112 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
113
114 #
115 # Functions for finding the method names of an object
116 #
117
118 def all_methods(obj):
119 '''
120 Return a list of names of methods of `obj`
121 '''
122 temp = []
123 for name in dir(obj):
124 func = getattr(obj, name)
125 if callable(func):
126 temp.append(name)
127 return temp
128
129 def public_methods(obj):
130 '''
131 Return a list of names of methods of `obj` which do not start with '_'
132 '''
133 return [name for name in all_methods(obj) if name[0] != '_']
134
135 #
136 # Server which is run in a process controlled by a manager
137 #
138
139 class ESC[4;38;5;81mServer(ESC[4;38;5;149mobject):
140 '''
141 Server class which runs in a process controlled by a manager object
142 '''
143 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
144 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
145
146 def __init__(self, registry, address, authkey, serializer):
147 if not isinstance(authkey, bytes):
148 raise TypeError(
149 "Authkey {0!r} is type {1!s}, not bytes".format(
150 authkey, type(authkey)))
151 self.registry = registry
152 self.authkey = process.AuthenticationString(authkey)
153 Listener, Client = listener_client[serializer]
154
155 # do authentication later
156 self.listener = Listener(address=address, backlog=16)
157 self.address = self.listener.address
158
159 self.id_to_obj = {'0': (None, ())}
160 self.id_to_refcount = {}
161 self.id_to_local_proxy_obj = {}
162 self.mutex = threading.Lock()
163
164 def serve_forever(self):
165 '''
166 Run the server forever
167 '''
168 self.stop_event = threading.Event()
169 process.current_process()._manager_server = self
170 try:
171 accepter = threading.Thread(target=self.accepter)
172 accepter.daemon = True
173 accepter.start()
174 try:
175 while not self.stop_event.is_set():
176 self.stop_event.wait(1)
177 except (KeyboardInterrupt, SystemExit):
178 pass
179 finally:
180 if sys.stdout != sys.__stdout__: # what about stderr?
181 util.debug('resetting stdout, stderr')
182 sys.stdout = sys.__stdout__
183 sys.stderr = sys.__stderr__
184 sys.exit(0)
185
186 def accepter(self):
187 while True:
188 try:
189 c = self.listener.accept()
190 except OSError:
191 continue
192 t = threading.Thread(target=self.handle_request, args=(c,))
193 t.daemon = True
194 t.start()
195
196 def _handle_request(self, c):
197 request = None
198 try:
199 connection.deliver_challenge(c, self.authkey)
200 connection.answer_challenge(c, self.authkey)
201 request = c.recv()
202 ignore, funcname, args, kwds = request
203 assert funcname in self.public, '%r unrecognized' % funcname
204 func = getattr(self, funcname)
205 except Exception:
206 msg = ('#TRACEBACK', format_exc())
207 else:
208 try:
209 result = func(c, *args, **kwds)
210 except Exception:
211 msg = ('#TRACEBACK', format_exc())
212 else:
213 msg = ('#RETURN', result)
214
215 try:
216 c.send(msg)
217 except Exception as e:
218 try:
219 c.send(('#TRACEBACK', format_exc()))
220 except Exception:
221 pass
222 util.info('Failure to send message: %r', msg)
223 util.info(' ... request was %r', request)
224 util.info(' ... exception was %r', e)
225
226 def handle_request(self, conn):
227 '''
228 Handle a new connection
229 '''
230 try:
231 self._handle_request(conn)
232 except SystemExit:
233 # Server.serve_client() calls sys.exit(0) on EOF
234 pass
235 finally:
236 conn.close()
237
238 def serve_client(self, conn):
239 '''
240 Handle requests from the proxies in a particular process/thread
241 '''
242 util.debug('starting server thread to service %r',
243 threading.current_thread().name)
244
245 recv = conn.recv
246 send = conn.send
247 id_to_obj = self.id_to_obj
248
249 while not self.stop_event.is_set():
250
251 try:
252 methodname = obj = None
253 request = recv()
254 ident, methodname, args, kwds = request
255 try:
256 obj, exposed, gettypeid = id_to_obj[ident]
257 except KeyError as ke:
258 try:
259 obj, exposed, gettypeid = \
260 self.id_to_local_proxy_obj[ident]
261 except KeyError:
262 raise ke
263
264 if methodname not in exposed:
265 raise AttributeError(
266 'method %r of %r object is not in exposed=%r' %
267 (methodname, type(obj), exposed)
268 )
269
270 function = getattr(obj, methodname)
271
272 try:
273 res = function(*args, **kwds)
274 except Exception as e:
275 msg = ('#ERROR', e)
276 else:
277 typeid = gettypeid and gettypeid.get(methodname, None)
278 if typeid:
279 rident, rexposed = self.create(conn, typeid, res)
280 token = Token(typeid, self.address, rident)
281 msg = ('#PROXY', (rexposed, token))
282 else:
283 msg = ('#RETURN', res)
284
285 except AttributeError:
286 if methodname is None:
287 msg = ('#TRACEBACK', format_exc())
288 else:
289 try:
290 fallback_func = self.fallback_mapping[methodname]
291 result = fallback_func(
292 self, conn, ident, obj, *args, **kwds
293 )
294 msg = ('#RETURN', result)
295 except Exception:
296 msg = ('#TRACEBACK', format_exc())
297
298 except EOFError:
299 util.debug('got EOF -- exiting thread serving %r',
300 threading.current_thread().name)
301 sys.exit(0)
302
303 except Exception:
304 msg = ('#TRACEBACK', format_exc())
305
306 try:
307 try:
308 send(msg)
309 except Exception:
310 send(('#UNSERIALIZABLE', format_exc()))
311 except Exception as e:
312 util.info('exception in thread serving %r',
313 threading.current_thread().name)
314 util.info(' ... message was %r', msg)
315 util.info(' ... exception was %r', e)
316 conn.close()
317 sys.exit(1)
318
319 def fallback_getvalue(self, conn, ident, obj):
320 return obj
321
322 def fallback_str(self, conn, ident, obj):
323 return str(obj)
324
325 def fallback_repr(self, conn, ident, obj):
326 return repr(obj)
327
328 fallback_mapping = {
329 '__str__':fallback_str,
330 '__repr__':fallback_repr,
331 '#GETVALUE':fallback_getvalue
332 }
333
334 def dummy(self, c):
335 pass
336
337 def debug_info(self, c):
338 '''
339 Return some info --- useful to spot problems with refcounting
340 '''
341 # Perhaps include debug info about 'c'?
342 with self.mutex:
343 result = []
344 keys = list(self.id_to_refcount.keys())
345 keys.sort()
346 for ident in keys:
347 if ident != '0':
348 result.append(' %s: refcount=%s\n %s' %
349 (ident, self.id_to_refcount[ident],
350 str(self.id_to_obj[ident][0])[:75]))
351 return '\n'.join(result)
352
353 def number_of_objects(self, c):
354 '''
355 Number of shared objects
356 '''
357 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
358 return len(self.id_to_refcount)
359
360 def shutdown(self, c):
361 '''
362 Shutdown this process
363 '''
364 try:
365 util.debug('manager received shutdown message')
366 c.send(('#RETURN', None))
367 except:
368 import traceback
369 traceback.print_exc()
370 finally:
371 self.stop_event.set()
372
373 def create(self, c, typeid, /, *args, **kwds):
374 '''
375 Create a new shared object and return its id
376 '''
377 with self.mutex:
378 callable, exposed, method_to_typeid, proxytype = \
379 self.registry[typeid]
380
381 if callable is None:
382 if kwds or (len(args) != 1):
383 raise ValueError(
384 "Without callable, must have one non-keyword argument")
385 obj = args[0]
386 else:
387 obj = callable(*args, **kwds)
388
389 if exposed is None:
390 exposed = public_methods(obj)
391 if method_to_typeid is not None:
392 if not isinstance(method_to_typeid, dict):
393 raise TypeError(
394 "Method_to_typeid {0!r}: type {1!s}, not dict".format(
395 method_to_typeid, type(method_to_typeid)))
396 exposed = list(exposed) + list(method_to_typeid)
397
398 ident = '%x' % id(obj) # convert to string because xmlrpclib
399 # only has 32 bit signed integers
400 util.debug('%r callable returned object with id %r', typeid, ident)
401
402 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
403 if ident not in self.id_to_refcount:
404 self.id_to_refcount[ident] = 0
405
406 self.incref(c, ident)
407 return ident, tuple(exposed)
408
409 def get_methods(self, c, token):
410 '''
411 Return the methods of the shared object indicated by token
412 '''
413 return tuple(self.id_to_obj[token.id][1])
414
415 def accept_connection(self, c, name):
416 '''
417 Spawn a new thread to serve this connection
418 '''
419 threading.current_thread().name = name
420 c.send(('#RETURN', None))
421 self.serve_client(c)
422
423 def incref(self, c, ident):
424 with self.mutex:
425 try:
426 self.id_to_refcount[ident] += 1
427 except KeyError as ke:
428 # If no external references exist but an internal (to the
429 # manager) still does and a new external reference is created
430 # from it, restore the manager's tracking of it from the
431 # previously stashed internal ref.
432 if ident in self.id_to_local_proxy_obj:
433 self.id_to_refcount[ident] = 1
434 self.id_to_obj[ident] = \
435 self.id_to_local_proxy_obj[ident]
436 util.debug('Server re-enabled tracking & INCREF %r', ident)
437 else:
438 raise ke
439
440 def decref(self, c, ident):
441 if ident not in self.id_to_refcount and \
442 ident in self.id_to_local_proxy_obj:
443 util.debug('Server DECREF skipping %r', ident)
444 return
445
446 with self.mutex:
447 if self.id_to_refcount[ident] <= 0:
448 raise AssertionError(
449 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
450 ident, self.id_to_obj[ident],
451 self.id_to_refcount[ident]))
452 self.id_to_refcount[ident] -= 1
453 if self.id_to_refcount[ident] == 0:
454 del self.id_to_refcount[ident]
455
456 if ident not in self.id_to_refcount:
457 # Two-step process in case the object turns out to contain other
458 # proxy objects (e.g. a managed list of managed lists).
459 # Otherwise, deleting self.id_to_obj[ident] would trigger the
460 # deleting of the stored value (another managed object) which would
461 # in turn attempt to acquire the mutex that is already held here.
462 self.id_to_obj[ident] = (None, (), None) # thread-safe
463 util.debug('disposing of obj with id %r', ident)
464 with self.mutex:
465 del self.id_to_obj[ident]
466
467
468 #
469 # Class to represent state of a manager
470 #
471
472 class ESC[4;38;5;81mState(ESC[4;38;5;149mobject):
473 __slots__ = ['value']
474 INITIAL = 0
475 STARTED = 1
476 SHUTDOWN = 2
477
478 #
479 # Mapping from serializer name to Listener and Client types
480 #
481
482 listener_client = {
483 'pickle' : (connection.Listener, connection.Client),
484 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
485 }
486
487 #
488 # Definition of BaseManager
489 #
490
491 class ESC[4;38;5;81mBaseManager(ESC[4;38;5;149mobject):
492 '''
493 Base class for managers
494 '''
495 _registry = {}
496 _Server = Server
497
498 def __init__(self, address=None, authkey=None, serializer='pickle',
499 ctx=None, *, shutdown_timeout=1.0):
500 if authkey is None:
501 authkey = process.current_process().authkey
502 self._address = address # XXX not final address if eg ('', 0)
503 self._authkey = process.AuthenticationString(authkey)
504 self._state = State()
505 self._state.value = State.INITIAL
506 self._serializer = serializer
507 self._Listener, self._Client = listener_client[serializer]
508 self._ctx = ctx or get_context()
509 self._shutdown_timeout = shutdown_timeout
510
511 def get_server(self):
512 '''
513 Return server object with serve_forever() method and address attribute
514 '''
515 if self._state.value != State.INITIAL:
516 if self._state.value == State.STARTED:
517 raise ProcessError("Already started server")
518 elif self._state.value == State.SHUTDOWN:
519 raise ProcessError("Manager has shut down")
520 else:
521 raise ProcessError(
522 "Unknown state {!r}".format(self._state.value))
523 return Server(self._registry, self._address,
524 self._authkey, self._serializer)
525
526 def connect(self):
527 '''
528 Connect manager object to the server process
529 '''
530 Listener, Client = listener_client[self._serializer]
531 conn = Client(self._address, authkey=self._authkey)
532 dispatch(conn, None, 'dummy')
533 self._state.value = State.STARTED
534
535 def start(self, initializer=None, initargs=()):
536 '''
537 Spawn a server process for this manager object
538 '''
539 if self._state.value != State.INITIAL:
540 if self._state.value == State.STARTED:
541 raise ProcessError("Already started server")
542 elif self._state.value == State.SHUTDOWN:
543 raise ProcessError("Manager has shut down")
544 else:
545 raise ProcessError(
546 "Unknown state {!r}".format(self._state.value))
547
548 if initializer is not None and not callable(initializer):
549 raise TypeError('initializer must be a callable')
550
551 # pipe over which we will retrieve address of server
552 reader, writer = connection.Pipe(duplex=False)
553
554 # spawn process which runs a server
555 self._process = self._ctx.Process(
556 target=type(self)._run_server,
557 args=(self._registry, self._address, self._authkey,
558 self._serializer, writer, initializer, initargs),
559 )
560 ident = ':'.join(str(i) for i in self._process._identity)
561 self._process.name = type(self).__name__ + '-' + ident
562 self._process.start()
563
564 # get address of server
565 writer.close()
566 self._address = reader.recv()
567 reader.close()
568
569 # register a finalizer
570 self._state.value = State.STARTED
571 self.shutdown = util.Finalize(
572 self, type(self)._finalize_manager,
573 args=(self._process, self._address, self._authkey, self._state,
574 self._Client, self._shutdown_timeout),
575 exitpriority=0
576 )
577
578 @classmethod
579 def _run_server(cls, registry, address, authkey, serializer, writer,
580 initializer=None, initargs=()):
581 '''
582 Create a server, report its address and run it
583 '''
584 # bpo-36368: protect server process from KeyboardInterrupt signals
585 signal.signal(signal.SIGINT, signal.SIG_IGN)
586
587 if initializer is not None:
588 initializer(*initargs)
589
590 # create server
591 server = cls._Server(registry, address, authkey, serializer)
592
593 # inform parent process of the server's address
594 writer.send(server.address)
595 writer.close()
596
597 # run the manager
598 util.info('manager serving at %r', server.address)
599 server.serve_forever()
600
601 def _create(self, typeid, /, *args, **kwds):
602 '''
603 Create a new shared object; return the token and exposed tuple
604 '''
605 assert self._state.value == State.STARTED, 'server not yet started'
606 conn = self._Client(self._address, authkey=self._authkey)
607 try:
608 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
609 finally:
610 conn.close()
611 return Token(typeid, self._address, id), exposed
612
613 def join(self, timeout=None):
614 '''
615 Join the manager process (if it has been spawned)
616 '''
617 if self._process is not None:
618 self._process.join(timeout)
619 if not self._process.is_alive():
620 self._process = None
621
622 def _debug_info(self):
623 '''
624 Return some info about the servers shared objects and connections
625 '''
626 conn = self._Client(self._address, authkey=self._authkey)
627 try:
628 return dispatch(conn, None, 'debug_info')
629 finally:
630 conn.close()
631
632 def _number_of_objects(self):
633 '''
634 Return the number of shared objects
635 '''
636 conn = self._Client(self._address, authkey=self._authkey)
637 try:
638 return dispatch(conn, None, 'number_of_objects')
639 finally:
640 conn.close()
641
642 def __enter__(self):
643 if self._state.value == State.INITIAL:
644 self.start()
645 if self._state.value != State.STARTED:
646 if self._state.value == State.INITIAL:
647 raise ProcessError("Unable to start server")
648 elif self._state.value == State.SHUTDOWN:
649 raise ProcessError("Manager has shut down")
650 else:
651 raise ProcessError(
652 "Unknown state {!r}".format(self._state.value))
653 return self
654
655 def __exit__(self, exc_type, exc_val, exc_tb):
656 self.shutdown()
657
658 @staticmethod
659 def _finalize_manager(process, address, authkey, state, _Client,
660 shutdown_timeout):
661 '''
662 Shutdown the manager process; will be registered as a finalizer
663 '''
664 if process.is_alive():
665 util.info('sending shutdown message to manager')
666 try:
667 conn = _Client(address, authkey=authkey)
668 try:
669 dispatch(conn, None, 'shutdown')
670 finally:
671 conn.close()
672 except Exception:
673 pass
674
675 process.join(timeout=shutdown_timeout)
676 if process.is_alive():
677 util.info('manager still alive')
678 if hasattr(process, 'terminate'):
679 util.info('trying to `terminate()` manager process')
680 process.terminate()
681 process.join(timeout=shutdown_timeout)
682 if process.is_alive():
683 util.info('manager still alive after terminate')
684 process.kill()
685 process.join()
686
687 state.value = State.SHUTDOWN
688 try:
689 del BaseProxy._address_to_local[address]
690 except KeyError:
691 pass
692
693 @property
694 def address(self):
695 return self._address
696
697 @classmethod
698 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
699 method_to_typeid=None, create_method=True):
700 '''
701 Register a typeid with the manager type
702 '''
703 if '_registry' not in cls.__dict__:
704 cls._registry = cls._registry.copy()
705
706 if proxytype is None:
707 proxytype = AutoProxy
708
709 exposed = exposed or getattr(proxytype, '_exposed_', None)
710
711 method_to_typeid = method_to_typeid or \
712 getattr(proxytype, '_method_to_typeid_', None)
713
714 if method_to_typeid:
715 for key, value in list(method_to_typeid.items()): # isinstance?
716 assert type(key) is str, '%r is not a string' % key
717 assert type(value) is str, '%r is not a string' % value
718
719 cls._registry[typeid] = (
720 callable, exposed, method_to_typeid, proxytype
721 )
722
723 if create_method:
724 def temp(self, /, *args, **kwds):
725 util.debug('requesting creation of a shared %r object', typeid)
726 token, exp = self._create(typeid, *args, **kwds)
727 proxy = proxytype(
728 token, self._serializer, manager=self,
729 authkey=self._authkey, exposed=exp
730 )
731 conn = self._Client(token.address, authkey=self._authkey)
732 dispatch(conn, None, 'decref', (token.id,))
733 return proxy
734 temp.__name__ = typeid
735 setattr(cls, typeid, temp)
736
737 #
738 # Subclass of set which get cleared after a fork
739 #
740
741 class ESC[4;38;5;81mProcessLocalSet(ESC[4;38;5;149mset):
742 def __init__(self):
743 util.register_after_fork(self, lambda obj: obj.clear())
744 def __reduce__(self):
745 return type(self), ()
746
747 #
748 # Definition of BaseProxy
749 #
750
751 class ESC[4;38;5;81mBaseProxy(ESC[4;38;5;149mobject):
752 '''
753 A base for proxies of shared objects
754 '''
755 _address_to_local = {}
756 _mutex = util.ForkAwareThreadLock()
757
758 def __init__(self, token, serializer, manager=None,
759 authkey=None, exposed=None, incref=True, manager_owned=False):
760 with BaseProxy._mutex:
761 tls_idset = BaseProxy._address_to_local.get(token.address, None)
762 if tls_idset is None:
763 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
764 BaseProxy._address_to_local[token.address] = tls_idset
765
766 # self._tls is used to record the connection used by this
767 # thread to communicate with the manager at token.address
768 self._tls = tls_idset[0]
769
770 # self._idset is used to record the identities of all shared
771 # objects for which the current process owns references and
772 # which are in the manager at token.address
773 self._idset = tls_idset[1]
774
775 self._token = token
776 self._id = self._token.id
777 self._manager = manager
778 self._serializer = serializer
779 self._Client = listener_client[serializer][1]
780
781 # Should be set to True only when a proxy object is being created
782 # on the manager server; primary use case: nested proxy objects.
783 # RebuildProxy detects when a proxy is being created on the manager
784 # and sets this value appropriately.
785 self._owned_by_manager = manager_owned
786
787 if authkey is not None:
788 self._authkey = process.AuthenticationString(authkey)
789 elif self._manager is not None:
790 self._authkey = self._manager._authkey
791 else:
792 self._authkey = process.current_process().authkey
793
794 if incref:
795 self._incref()
796
797 util.register_after_fork(self, BaseProxy._after_fork)
798
799 def _connect(self):
800 util.debug('making connection to manager')
801 name = process.current_process().name
802 if threading.current_thread().name != 'MainThread':
803 name += '|' + threading.current_thread().name
804 conn = self._Client(self._token.address, authkey=self._authkey)
805 dispatch(conn, None, 'accept_connection', (name,))
806 self._tls.connection = conn
807
808 def _callmethod(self, methodname, args=(), kwds={}):
809 '''
810 Try to call a method of the referent and return a copy of the result
811 '''
812 try:
813 conn = self._tls.connection
814 except AttributeError:
815 util.debug('thread %r does not own a connection',
816 threading.current_thread().name)
817 self._connect()
818 conn = self._tls.connection
819
820 conn.send((self._id, methodname, args, kwds))
821 kind, result = conn.recv()
822
823 if kind == '#RETURN':
824 return result
825 elif kind == '#PROXY':
826 exposed, token = result
827 proxytype = self._manager._registry[token.typeid][-1]
828 token.address = self._token.address
829 proxy = proxytype(
830 token, self._serializer, manager=self._manager,
831 authkey=self._authkey, exposed=exposed
832 )
833 conn = self._Client(token.address, authkey=self._authkey)
834 dispatch(conn, None, 'decref', (token.id,))
835 return proxy
836 raise convert_to_error(kind, result)
837
838 def _getvalue(self):
839 '''
840 Get a copy of the value of the referent
841 '''
842 return self._callmethod('#GETVALUE')
843
844 def _incref(self):
845 if self._owned_by_manager:
846 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
847 return
848
849 conn = self._Client(self._token.address, authkey=self._authkey)
850 dispatch(conn, None, 'incref', (self._id,))
851 util.debug('INCREF %r', self._token.id)
852
853 self._idset.add(self._id)
854
855 state = self._manager and self._manager._state
856
857 self._close = util.Finalize(
858 self, BaseProxy._decref,
859 args=(self._token, self._authkey, state,
860 self._tls, self._idset, self._Client),
861 exitpriority=10
862 )
863
864 @staticmethod
865 def _decref(token, authkey, state, tls, idset, _Client):
866 idset.discard(token.id)
867
868 # check whether manager is still alive
869 if state is None or state.value == State.STARTED:
870 # tell manager this process no longer cares about referent
871 try:
872 util.debug('DECREF %r', token.id)
873 conn = _Client(token.address, authkey=authkey)
874 dispatch(conn, None, 'decref', (token.id,))
875 except Exception as e:
876 util.debug('... decref failed %s', e)
877
878 else:
879 util.debug('DECREF %r -- manager already shutdown', token.id)
880
881 # check whether we can close this thread's connection because
882 # the process owns no more references to objects for this manager
883 if not idset and hasattr(tls, 'connection'):
884 util.debug('thread %r has no more proxies so closing conn',
885 threading.current_thread().name)
886 tls.connection.close()
887 del tls.connection
888
889 def _after_fork(self):
890 self._manager = None
891 try:
892 self._incref()
893 except Exception as e:
894 # the proxy may just be for a manager which has shutdown
895 util.info('incref failed: %s' % e)
896
897 def __reduce__(self):
898 kwds = {}
899 if get_spawning_popen() is not None:
900 kwds['authkey'] = self._authkey
901
902 if getattr(self, '_isauto', False):
903 kwds['exposed'] = self._exposed_
904 return (RebuildProxy,
905 (AutoProxy, self._token, self._serializer, kwds))
906 else:
907 return (RebuildProxy,
908 (type(self), self._token, self._serializer, kwds))
909
910 def __deepcopy__(self, memo):
911 return self._getvalue()
912
913 def __repr__(self):
914 return '<%s object, typeid %r at %#x>' % \
915 (type(self).__name__, self._token.typeid, id(self))
916
917 def __str__(self):
918 '''
919 Return representation of the referent (or a fall-back if that fails)
920 '''
921 try:
922 return self._callmethod('__repr__')
923 except Exception:
924 return repr(self)[:-1] + "; '__str__()' failed>"
925
926 #
927 # Function used for unpickling
928 #
929
930 def RebuildProxy(func, token, serializer, kwds):
931 '''
932 Function used for unpickling proxy objects.
933 '''
934 server = getattr(process.current_process(), '_manager_server', None)
935 if server and server.address == token.address:
936 util.debug('Rebuild a proxy owned by manager, token=%r', token)
937 kwds['manager_owned'] = True
938 if token.id not in server.id_to_local_proxy_obj:
939 server.id_to_local_proxy_obj[token.id] = \
940 server.id_to_obj[token.id]
941 incref = (
942 kwds.pop('incref', True) and
943 not getattr(process.current_process(), '_inheriting', False)
944 )
945 return func(token, serializer, incref=incref, **kwds)
946
947 #
948 # Functions to create proxies and proxy types
949 #
950
951 def MakeProxyType(name, exposed, _cache={}):
952 '''
953 Return a proxy type whose methods are given by `exposed`
954 '''
955 exposed = tuple(exposed)
956 try:
957 return _cache[(name, exposed)]
958 except KeyError:
959 pass
960
961 dic = {}
962
963 for meth in exposed:
964 exec('''def %s(self, /, *args, **kwds):
965 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
966
967 ProxyType = type(name, (BaseProxy,), dic)
968 ProxyType._exposed_ = exposed
969 _cache[(name, exposed)] = ProxyType
970 return ProxyType
971
972
973 def AutoProxy(token, serializer, manager=None, authkey=None,
974 exposed=None, incref=True, manager_owned=False):
975 '''
976 Return an auto-proxy for `token`
977 '''
978 _Client = listener_client[serializer][1]
979
980 if exposed is None:
981 conn = _Client(token.address, authkey=authkey)
982 try:
983 exposed = dispatch(conn, None, 'get_methods', (token,))
984 finally:
985 conn.close()
986
987 if authkey is None and manager is not None:
988 authkey = manager._authkey
989 if authkey is None:
990 authkey = process.current_process().authkey
991
992 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
993 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
994 incref=incref, manager_owned=manager_owned)
995 proxy._isauto = True
996 return proxy
997
998 #
999 # Types/callables which we will register with SyncManager
1000 #
1001
1002 class ESC[4;38;5;81mNamespace(ESC[4;38;5;149mobject):
1003 def __init__(self, /, **kwds):
1004 self.__dict__.update(kwds)
1005 def __repr__(self):
1006 items = list(self.__dict__.items())
1007 temp = []
1008 for name, value in items:
1009 if not name.startswith('_'):
1010 temp.append('%s=%r' % (name, value))
1011 temp.sort()
1012 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
1013
1014 class ESC[4;38;5;81mValue(ESC[4;38;5;149mobject):
1015 def __init__(self, typecode, value, lock=True):
1016 self._typecode = typecode
1017 self._value = value
1018 def get(self):
1019 return self._value
1020 def set(self, value):
1021 self._value = value
1022 def __repr__(self):
1023 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1024 value = property(get, set)
1025
1026 def Array(typecode, sequence, lock=True):
1027 return array.array(typecode, sequence)
1028
1029 #
1030 # Proxy types used by SyncManager
1031 #
1032
1033 class ESC[4;38;5;81mIteratorProxy(ESC[4;38;5;149mBaseProxy):
1034 _exposed_ = ('__next__', 'send', 'throw', 'close')
1035 def __iter__(self):
1036 return self
1037 def __next__(self, *args):
1038 return self._callmethod('__next__', args)
1039 def send(self, *args):
1040 return self._callmethod('send', args)
1041 def throw(self, *args):
1042 return self._callmethod('throw', args)
1043 def close(self, *args):
1044 return self._callmethod('close', args)
1045
1046
1047 class ESC[4;38;5;81mAcquirerProxy(ESC[4;38;5;149mBaseProxy):
1048 _exposed_ = ('acquire', 'release')
1049 def acquire(self, blocking=True, timeout=None):
1050 args = (blocking,) if timeout is None else (blocking, timeout)
1051 return self._callmethod('acquire', args)
1052 def release(self):
1053 return self._callmethod('release')
1054 def __enter__(self):
1055 return self._callmethod('acquire')
1056 def __exit__(self, exc_type, exc_val, exc_tb):
1057 return self._callmethod('release')
1058
1059
1060 class ESC[4;38;5;81mConditionProxy(ESC[4;38;5;149mAcquirerProxy):
1061 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
1062 def wait(self, timeout=None):
1063 return self._callmethod('wait', (timeout,))
1064 def notify(self, n=1):
1065 return self._callmethod('notify', (n,))
1066 def notify_all(self):
1067 return self._callmethod('notify_all')
1068 def wait_for(self, predicate, timeout=None):
1069 result = predicate()
1070 if result:
1071 return result
1072 if timeout is not None:
1073 endtime = time.monotonic() + timeout
1074 else:
1075 endtime = None
1076 waittime = None
1077 while not result:
1078 if endtime is not None:
1079 waittime = endtime - time.monotonic()
1080 if waittime <= 0:
1081 break
1082 self.wait(waittime)
1083 result = predicate()
1084 return result
1085
1086
1087 class ESC[4;38;5;81mEventProxy(ESC[4;38;5;149mBaseProxy):
1088 _exposed_ = ('is_set', 'set', 'clear', 'wait')
1089 def is_set(self):
1090 return self._callmethod('is_set')
1091 def set(self):
1092 return self._callmethod('set')
1093 def clear(self):
1094 return self._callmethod('clear')
1095 def wait(self, timeout=None):
1096 return self._callmethod('wait', (timeout,))
1097
1098
1099 class ESC[4;38;5;81mBarrierProxy(ESC[4;38;5;149mBaseProxy):
1100 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1101 def wait(self, timeout=None):
1102 return self._callmethod('wait', (timeout,))
1103 def abort(self):
1104 return self._callmethod('abort')
1105 def reset(self):
1106 return self._callmethod('reset')
1107 @property
1108 def parties(self):
1109 return self._callmethod('__getattribute__', ('parties',))
1110 @property
1111 def n_waiting(self):
1112 return self._callmethod('__getattribute__', ('n_waiting',))
1113 @property
1114 def broken(self):
1115 return self._callmethod('__getattribute__', ('broken',))
1116
1117
1118 class ESC[4;38;5;81mNamespaceProxy(ESC[4;38;5;149mBaseProxy):
1119 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1120 def __getattr__(self, key):
1121 if key[0] == '_':
1122 return object.__getattribute__(self, key)
1123 callmethod = object.__getattribute__(self, '_callmethod')
1124 return callmethod('__getattribute__', (key,))
1125 def __setattr__(self, key, value):
1126 if key[0] == '_':
1127 return object.__setattr__(self, key, value)
1128 callmethod = object.__getattribute__(self, '_callmethod')
1129 return callmethod('__setattr__', (key, value))
1130 def __delattr__(self, key):
1131 if key[0] == '_':
1132 return object.__delattr__(self, key)
1133 callmethod = object.__getattribute__(self, '_callmethod')
1134 return callmethod('__delattr__', (key,))
1135
1136
1137 class ESC[4;38;5;81mValueProxy(ESC[4;38;5;149mBaseProxy):
1138 _exposed_ = ('get', 'set')
1139 def get(self):
1140 return self._callmethod('get')
1141 def set(self, value):
1142 return self._callmethod('set', (value,))
1143 value = property(get, set)
1144
1145 __class_getitem__ = classmethod(types.GenericAlias)
1146
1147
1148 BaseListProxy = MakeProxyType('BaseListProxy', (
1149 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1150 '__mul__', '__reversed__', '__rmul__', '__setitem__',
1151 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1152 'reverse', 'sort', '__imul__'
1153 ))
1154 class ESC[4;38;5;81mListProxy(ESC[4;38;5;149mBaseListProxy):
1155 def __iadd__(self, value):
1156 self._callmethod('extend', (value,))
1157 return self
1158 def __imul__(self, value):
1159 self._callmethod('__imul__', (value,))
1160 return self
1161
1162
1163 DictProxy = MakeProxyType('DictProxy', (
1164 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
1165 '__setitem__', 'clear', 'copy', 'get', 'items',
1166 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1167 ))
1168 DictProxy._method_to_typeid_ = {
1169 '__iter__': 'Iterator',
1170 }
1171
1172
1173 ArrayProxy = MakeProxyType('ArrayProxy', (
1174 '__len__', '__getitem__', '__setitem__'
1175 ))
1176
1177
1178 BasePoolProxy = MakeProxyType('PoolProxy', (
1179 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1180 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
1181 ))
1182 BasePoolProxy._method_to_typeid_ = {
1183 'apply_async': 'AsyncResult',
1184 'map_async': 'AsyncResult',
1185 'starmap_async': 'AsyncResult',
1186 'imap': 'Iterator',
1187 'imap_unordered': 'Iterator'
1188 }
1189 class ESC[4;38;5;81mPoolProxy(ESC[4;38;5;149mBasePoolProxy):
1190 def __enter__(self):
1191 return self
1192 def __exit__(self, exc_type, exc_val, exc_tb):
1193 self.terminate()
1194
1195 #
1196 # Definition of SyncManager
1197 #
1198
1199 class ESC[4;38;5;81mSyncManager(ESC[4;38;5;149mBaseManager):
1200 '''
1201 Subclass of `BaseManager` which supports a number of shared object types.
1202
1203 The types registered are those intended for the synchronization
1204 of threads, plus `dict`, `list` and `Namespace`.
1205
1206 The `multiprocessing.Manager()` function creates started instances of
1207 this class.
1208 '''
1209
1210 SyncManager.register('Queue', queue.Queue)
1211 SyncManager.register('JoinableQueue', queue.Queue)
1212 SyncManager.register('Event', threading.Event, EventProxy)
1213 SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1214 SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1215 SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1216 SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1217 AcquirerProxy)
1218 SyncManager.register('Condition', threading.Condition, ConditionProxy)
1219 SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
1220 SyncManager.register('Pool', pool.Pool, PoolProxy)
1221 SyncManager.register('list', list, ListProxy)
1222 SyncManager.register('dict', dict, DictProxy)
1223 SyncManager.register('Value', Value, ValueProxy)
1224 SyncManager.register('Array', Array, ArrayProxy)
1225 SyncManager.register('Namespace', Namespace, NamespaceProxy)
1226
1227 # types returned by methods of PoolProxy
1228 SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1229 SyncManager.register('AsyncResult', create_method=False)
1230
1231 #
1232 # Definition of SharedMemoryManager and SharedMemoryServer
1233 #
1234
1235 if HAS_SHMEM:
1236 class ESC[4;38;5;81m_SharedMemoryTracker:
1237 "Manages one or more shared memory segments."
1238
1239 def __init__(self, name, segment_names=[]):
1240 self.shared_memory_context_name = name
1241 self.segment_names = segment_names
1242
1243 def register_segment(self, segment_name):
1244 "Adds the supplied shared memory block name to tracker."
1245 util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1246 self.segment_names.append(segment_name)
1247
1248 def destroy_segment(self, segment_name):
1249 """Calls unlink() on the shared memory block with the supplied name
1250 and removes it from the list of blocks being tracked."""
1251 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1252 self.segment_names.remove(segment_name)
1253 segment = shared_memory.SharedMemory(segment_name)
1254 segment.close()
1255 segment.unlink()
1256
1257 def unlink(self):
1258 "Calls destroy_segment() on all tracked shared memory blocks."
1259 for segment_name in self.segment_names[:]:
1260 self.destroy_segment(segment_name)
1261
1262 def __del__(self):
1263 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1264 self.unlink()
1265
1266 def __getstate__(self):
1267 return (self.shared_memory_context_name, self.segment_names)
1268
1269 def __setstate__(self, state):
1270 self.__init__(*state)
1271
1272
1273 class ESC[4;38;5;81mSharedMemoryServer(ESC[4;38;5;149mServer):
1274
1275 public = Server.public + \
1276 ['track_segment', 'release_segment', 'list_segments']
1277
1278 def __init__(self, *args, **kwargs):
1279 Server.__init__(self, *args, **kwargs)
1280 address = self.address
1281 # The address of Linux abstract namespaces can be bytes
1282 if isinstance(address, bytes):
1283 address = os.fsdecode(address)
1284 self.shared_memory_context = \
1285 _SharedMemoryTracker(f"shm_{address}_{getpid()}")
1286 util.debug(f"SharedMemoryServer started by pid {getpid()}")
1287
1288 def create(self, c, typeid, /, *args, **kwargs):
1289 """Create a new distributed-shared object (not backed by a shared
1290 memory block) and return its id to be used in a Proxy Object."""
1291 # Unless set up as a shared proxy, don't make shared_memory_context
1292 # a standard part of kwargs. This makes things easier for supplying
1293 # simple functions.
1294 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1295 kwargs['shared_memory_context'] = self.shared_memory_context
1296 return Server.create(self, c, typeid, *args, **kwargs)
1297
1298 def shutdown(self, c):
1299 "Call unlink() on all tracked shared memory, terminate the Server."
1300 self.shared_memory_context.unlink()
1301 return Server.shutdown(self, c)
1302
1303 def track_segment(self, c, segment_name):
1304 "Adds the supplied shared memory block name to Server's tracker."
1305 self.shared_memory_context.register_segment(segment_name)
1306
1307 def release_segment(self, c, segment_name):
1308 """Calls unlink() on the shared memory block with the supplied name
1309 and removes it from the tracker instance inside the Server."""
1310 self.shared_memory_context.destroy_segment(segment_name)
1311
1312 def list_segments(self, c):
1313 """Returns a list of names of shared memory blocks that the Server
1314 is currently tracking."""
1315 return self.shared_memory_context.segment_names
1316
1317
1318 class ESC[4;38;5;81mSharedMemoryManager(ESC[4;38;5;149mBaseManager):
1319 """Like SyncManager but uses SharedMemoryServer instead of Server.
1320
1321 It provides methods for creating and returning SharedMemory instances
1322 and for creating a list-like object (ShareableList) backed by shared
1323 memory. It also provides methods that create and return Proxy Objects
1324 that support synchronization across processes (i.e. multi-process-safe
1325 locks and semaphores).
1326 """
1327
1328 _Server = SharedMemoryServer
1329
1330 def __init__(self, *args, **kwargs):
1331 if os.name == "posix":
1332 # bpo-36867: Ensure the resource_tracker is running before
1333 # launching the manager process, so that concurrent
1334 # shared_memory manipulation both in the manager and in the
1335 # current process does not create two resource_tracker
1336 # processes.
1337 from . import resource_tracker
1338 resource_tracker.ensure_running()
1339 BaseManager.__init__(self, *args, **kwargs)
1340 util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1341
1342 def __del__(self):
1343 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1344
1345 def get_server(self):
1346 'Better than monkeypatching for now; merge into Server ultimately'
1347 if self._state.value != State.INITIAL:
1348 if self._state.value == State.STARTED:
1349 raise ProcessError("Already started SharedMemoryServer")
1350 elif self._state.value == State.SHUTDOWN:
1351 raise ProcessError("SharedMemoryManager has shut down")
1352 else:
1353 raise ProcessError(
1354 "Unknown state {!r}".format(self._state.value))
1355 return self._Server(self._registry, self._address,
1356 self._authkey, self._serializer)
1357
1358 def SharedMemory(self, size):
1359 """Returns a new SharedMemory instance with the specified size in
1360 bytes, to be tracked by the manager."""
1361 with self._Client(self._address, authkey=self._authkey) as conn:
1362 sms = shared_memory.SharedMemory(None, create=True, size=size)
1363 try:
1364 dispatch(conn, None, 'track_segment', (sms.name,))
1365 except BaseException as e:
1366 sms.unlink()
1367 raise e
1368 return sms
1369
1370 def ShareableList(self, sequence):
1371 """Returns a new ShareableList instance populated with the values
1372 from the input sequence, to be tracked by the manager."""
1373 with self._Client(self._address, authkey=self._authkey) as conn:
1374 sl = shared_memory.ShareableList(sequence)
1375 try:
1376 dispatch(conn, None, 'track_segment', (sl.shm.name,))
1377 except BaseException as e:
1378 sl.shm.unlink()
1379 raise e
1380 return sl