1 #
2 # Module providing manager classes for dealing
3 # with shared objects
4 #
5 # multiprocessing/managers.py
6 #
7 # Copyright (c) 2006-2008, R Oudkerk
8 # Licensed to PSF under a Contributor Agreement.
9 #
10
11 __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13 #
14 # Imports
15 #
16
17 import sys
18 import threading
19 import signal
20 import array
21 import queue
22 import time
23 import types
24 import os
25 from os import getpid
26
27 from traceback import format_exc
28
29 from . import connection
30 from .context import reduction, get_spawning_popen, ProcessError
31 from . import pool
32 from . import process
33 from . import util
34 from . import get_context
35 try:
36 from . import shared_memory
37 except ImportError:
38 HAS_SHMEM = False
39 else:
40 HAS_SHMEM = True
41 __all__.append('SharedMemoryManager')
42
43 #
44 # Register some things for pickling
45 #
46
47 def reduce_array(a):
48 return array.array, (a.typecode, a.tobytes())
49 reduction.register(array.array, reduce_array)
50
51 view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
52 def rebuild_as_list(obj):
53 return list, (list(obj),)
54 for view_type in view_types:
55 reduction.register(view_type, rebuild_as_list)
56 del view_type, view_types
57
58 #
59 # Type for identifying shared objects
60 #
61
62 class ESC[4;38;5;81mToken(ESC[4;38;5;149mobject):
63 '''
64 Type to uniquely identify a shared object
65 '''
66 __slots__ = ('typeid', 'address', 'id')
67
68 def __init__(self, typeid, address, id):
69 (self.typeid, self.address, self.id) = (typeid, address, id)
70
71 def __getstate__(self):
72 return (self.typeid, self.address, self.id)
73
74 def __setstate__(self, state):
75 (self.typeid, self.address, self.id) = state
76
77 def __repr__(self):
78 return '%s(typeid=%r, address=%r, id=%r)' % \
79 (self.__class__.__name__, self.typeid, self.address, self.id)
80
81 #
82 # Function for communication with a manager's server process
83 #
84
85 def dispatch(c, id, methodname, args=(), kwds={}):
86 '''
87 Send a message to manager using connection `c` and return response
88 '''
89 c.send((id, methodname, args, kwds))
90 kind, result = c.recv()
91 if kind == '#RETURN':
92 return result
93 raise convert_to_error(kind, result)
94
95 def convert_to_error(kind, result):
96 if kind == '#ERROR':
97 return result
98 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
99 if not isinstance(result, str):
100 raise TypeError(
101 "Result {0!r} (kind '{1}') type is {2}, not str".format(
102 result, kind, type(result)))
103 if kind == '#UNSERIALIZABLE':
104 return RemoteError('Unserializable message: %s\n' % result)
105 else:
106 return RemoteError(result)
107 else:
108 return ValueError('Unrecognized message type {!r}'.format(kind))
109
110 class ESC[4;38;5;81mRemoteError(ESC[4;38;5;149mException):
111 def __str__(self):
112 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
113
114 #
115 # Functions for finding the method names of an object
116 #
117
118 def all_methods(obj):
119 '''
120 Return a list of names of methods of `obj`
121 '''
122 temp = []
123 for name in dir(obj):
124 func = getattr(obj, name)
125 if callable(func):
126 temp.append(name)
127 return temp
128
129 def public_methods(obj):
130 '''
131 Return a list of names of methods of `obj` which do not start with '_'
132 '''
133 return [name for name in all_methods(obj) if name[0] != '_']
134
135 #
136 # Server which is run in a process controlled by a manager
137 #
138
139 class ESC[4;38;5;81mServer(ESC[4;38;5;149mobject):
140 '''
141 Server class which runs in a process controlled by a manager object
142 '''
143 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
144 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
145
146 def __init__(self, registry, address, authkey, serializer):
147 if not isinstance(authkey, bytes):
148 raise TypeError(
149 "Authkey {0!r} is type {1!s}, not bytes".format(
150 authkey, type(authkey)))
151 self.registry = registry
152 self.authkey = process.AuthenticationString(authkey)
153 Listener, Client = listener_client[serializer]
154
155 # do authentication later
156 self.listener = Listener(address=address, backlog=16)
157 self.address = self.listener.address
158
159 self.id_to_obj = {'0': (None, ())}
160 self.id_to_refcount = {}
161 self.id_to_local_proxy_obj = {}
162 self.mutex = threading.Lock()
163
164 def serve_forever(self):
165 '''
166 Run the server forever
167 '''
168 self.stop_event = threading.Event()
169 process.current_process()._manager_server = self
170 try:
171 accepter = threading.Thread(target=self.accepter)
172 accepter.daemon = True
173 accepter.start()
174 try:
175 while not self.stop_event.is_set():
176 self.stop_event.wait(1)
177 except (KeyboardInterrupt, SystemExit):
178 pass
179 finally:
180 if sys.stdout != sys.__stdout__: # what about stderr?
181 util.debug('resetting stdout, stderr')
182 sys.stdout = sys.__stdout__
183 sys.stderr = sys.__stderr__
184 sys.exit(0)
185
186 def accepter(self):
187 while True:
188 try:
189 c = self.listener.accept()
190 except OSError:
191 continue
192 t = threading.Thread(target=self.handle_request, args=(c,))
193 t.daemon = True
194 t.start()
195
196 def _handle_request(self, c):
197 request = None
198 try:
199 connection.deliver_challenge(c, self.authkey)
200 connection.answer_challenge(c, self.authkey)
201 request = c.recv()
202 ignore, funcname, args, kwds = request
203 assert funcname in self.public, '%r unrecognized' % funcname
204 func = getattr(self, funcname)
205 except Exception:
206 msg = ('#TRACEBACK', format_exc())
207 else:
208 try:
209 result = func(c, *args, **kwds)
210 except Exception:
211 msg = ('#TRACEBACK', format_exc())
212 else:
213 msg = ('#RETURN', result)
214
215 try:
216 c.send(msg)
217 except Exception as e:
218 try:
219 c.send(('#TRACEBACK', format_exc()))
220 except Exception:
221 pass
222 util.info('Failure to send message: %r', msg)
223 util.info(' ... request was %r', request)
224 util.info(' ... exception was %r', e)
225
226 def handle_request(self, conn):
227 '''
228 Handle a new connection
229 '''
230 try:
231 self._handle_request(conn)
232 except SystemExit:
233 # Server.serve_client() calls sys.exit(0) on EOF
234 pass
235 finally:
236 conn.close()
237
238 def serve_client(self, conn):
239 '''
240 Handle requests from the proxies in a particular process/thread
241 '''
242 util.debug('starting server thread to service %r',
243 threading.current_thread().name)
244
245 recv = conn.recv
246 send = conn.send
247 id_to_obj = self.id_to_obj
248
249 while not self.stop_event.is_set():
250
251 try:
252 methodname = obj = None
253 request = recv()
254 ident, methodname, args, kwds = request
255 try:
256 obj, exposed, gettypeid = id_to_obj[ident]
257 except KeyError as ke:
258 try:
259 obj, exposed, gettypeid = \
260 self.id_to_local_proxy_obj[ident]
261 except KeyError:
262 raise ke
263
264 if methodname not in exposed:
265 raise AttributeError(
266 'method %r of %r object is not in exposed=%r' %
267 (methodname, type(obj), exposed)
268 )
269
270 function = getattr(obj, methodname)
271
272 try:
273 res = function(*args, **kwds)
274 except Exception as e:
275 msg = ('#ERROR', e)
276 else:
277 typeid = gettypeid and gettypeid.get(methodname, None)
278 if typeid:
279 rident, rexposed = self.create(conn, typeid, res)
280 token = Token(typeid, self.address, rident)
281 msg = ('#PROXY', (rexposed, token))
282 else:
283 msg = ('#RETURN', res)
284
285 except AttributeError:
286 if methodname is None:
287 msg = ('#TRACEBACK', format_exc())
288 else:
289 try:
290 fallback_func = self.fallback_mapping[methodname]
291 result = fallback_func(
292 self, conn, ident, obj, *args, **kwds
293 )
294 msg = ('#RETURN', result)
295 except Exception:
296 msg = ('#TRACEBACK', format_exc())
297
298 except EOFError:
299 util.debug('got EOF -- exiting thread serving %r',
300 threading.current_thread().name)
301 sys.exit(0)
302
303 except Exception:
304 msg = ('#TRACEBACK', format_exc())
305
306 try:
307 try:
308 send(msg)
309 except Exception:
310 send(('#UNSERIALIZABLE', format_exc()))
311 except Exception as e:
312 util.info('exception in thread serving %r',
313 threading.current_thread().name)
314 util.info(' ... message was %r', msg)
315 util.info(' ... exception was %r', e)
316 conn.close()
317 sys.exit(1)
318
319 def fallback_getvalue(self, conn, ident, obj):
320 return obj
321
322 def fallback_str(self, conn, ident, obj):
323 return str(obj)
324
325 def fallback_repr(self, conn, ident, obj):
326 return repr(obj)
327
328 fallback_mapping = {
329 '__str__':fallback_str,
330 '__repr__':fallback_repr,
331 '#GETVALUE':fallback_getvalue
332 }
333
334 def dummy(self, c):
335 pass
336
337 def debug_info(self, c):
338 '''
339 Return some info --- useful to spot problems with refcounting
340 '''
341 # Perhaps include debug info about 'c'?
342 with self.mutex:
343 result = []
344 keys = list(self.id_to_refcount.keys())
345 keys.sort()
346 for ident in keys:
347 if ident != '0':
348 result.append(' %s: refcount=%s\n %s' %
349 (ident, self.id_to_refcount[ident],
350 str(self.id_to_obj[ident][0])[:75]))
351 return '\n'.join(result)
352
353 def number_of_objects(self, c):
354 '''
355 Number of shared objects
356 '''
357 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
358 return len(self.id_to_refcount)
359
360 def shutdown(self, c):
361 '''
362 Shutdown this process
363 '''
364 try:
365 util.debug('manager received shutdown message')
366 c.send(('#RETURN', None))
367 except:
368 import traceback
369 traceback.print_exc()
370 finally:
371 self.stop_event.set()
372
373 def create(self, c, typeid, /, *args, **kwds):
374 '''
375 Create a new shared object and return its id
376 '''
377 with self.mutex:
378 callable, exposed, method_to_typeid, proxytype = \
379 self.registry[typeid]
380
381 if callable is None:
382 if kwds or (len(args) != 1):
383 raise ValueError(
384 "Without callable, must have one non-keyword argument")
385 obj = args[0]
386 else:
387 obj = callable(*args, **kwds)
388
389 if exposed is None:
390 exposed = public_methods(obj)
391 if method_to_typeid is not None:
392 if not isinstance(method_to_typeid, dict):
393 raise TypeError(
394 "Method_to_typeid {0!r}: type {1!s}, not dict".format(
395 method_to_typeid, type(method_to_typeid)))
396 exposed = list(exposed) + list(method_to_typeid)
397
398 ident = '%x' % id(obj) # convert to string because xmlrpclib
399 # only has 32 bit signed integers
400 util.debug('%r callable returned object with id %r', typeid, ident)
401
402 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
403 if ident not in self.id_to_refcount:
404 self.id_to_refcount[ident] = 0
405
406 self.incref(c, ident)
407 return ident, tuple(exposed)
408
409 def get_methods(self, c, token):
410 '''
411 Return the methods of the shared object indicated by token
412 '''
413 return tuple(self.id_to_obj[token.id][1])
414
415 def accept_connection(self, c, name):
416 '''
417 Spawn a new thread to serve this connection
418 '''
419 threading.current_thread().name = name
420 c.send(('#RETURN', None))
421 self.serve_client(c)
422
423 def incref(self, c, ident):
424 with self.mutex:
425 try:
426 self.id_to_refcount[ident] += 1
427 except KeyError as ke:
428 # If no external references exist but an internal (to the
429 # manager) still does and a new external reference is created
430 # from it, restore the manager's tracking of it from the
431 # previously stashed internal ref.
432 if ident in self.id_to_local_proxy_obj:
433 self.id_to_refcount[ident] = 1
434 self.id_to_obj[ident] = \
435 self.id_to_local_proxy_obj[ident]
436 obj, exposed, gettypeid = self.id_to_obj[ident]
437 util.debug('Server re-enabled tracking & INCREF %r', ident)
438 else:
439 raise ke
440
441 def decref(self, c, ident):
442 if ident not in self.id_to_refcount and \
443 ident in self.id_to_local_proxy_obj:
444 util.debug('Server DECREF skipping %r', ident)
445 return
446
447 with self.mutex:
448 if self.id_to_refcount[ident] <= 0:
449 raise AssertionError(
450 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
451 ident, self.id_to_obj[ident],
452 self.id_to_refcount[ident]))
453 self.id_to_refcount[ident] -= 1
454 if self.id_to_refcount[ident] == 0:
455 del self.id_to_refcount[ident]
456
457 if ident not in self.id_to_refcount:
458 # Two-step process in case the object turns out to contain other
459 # proxy objects (e.g. a managed list of managed lists).
460 # Otherwise, deleting self.id_to_obj[ident] would trigger the
461 # deleting of the stored value (another managed object) which would
462 # in turn attempt to acquire the mutex that is already held here.
463 self.id_to_obj[ident] = (None, (), None) # thread-safe
464 util.debug('disposing of obj with id %r', ident)
465 with self.mutex:
466 del self.id_to_obj[ident]
467
468
469 #
470 # Class to represent state of a manager
471 #
472
473 class ESC[4;38;5;81mState(ESC[4;38;5;149mobject):
474 __slots__ = ['value']
475 INITIAL = 0
476 STARTED = 1
477 SHUTDOWN = 2
478
479 #
480 # Mapping from serializer name to Listener and Client types
481 #
482
483 listener_client = {
484 'pickle' : (connection.Listener, connection.Client),
485 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
486 }
487
488 #
489 # Definition of BaseManager
490 #
491
492 class ESC[4;38;5;81mBaseManager(ESC[4;38;5;149mobject):
493 '''
494 Base class for managers
495 '''
496 _registry = {}
497 _Server = Server
498
499 def __init__(self, address=None, authkey=None, serializer='pickle',
500 ctx=None, *, shutdown_timeout=1.0):
501 if authkey is None:
502 authkey = process.current_process().authkey
503 self._address = address # XXX not final address if eg ('', 0)
504 self._authkey = process.AuthenticationString(authkey)
505 self._state = State()
506 self._state.value = State.INITIAL
507 self._serializer = serializer
508 self._Listener, self._Client = listener_client[serializer]
509 self._ctx = ctx or get_context()
510 self._shutdown_timeout = shutdown_timeout
511
512 def get_server(self):
513 '''
514 Return server object with serve_forever() method and address attribute
515 '''
516 if self._state.value != State.INITIAL:
517 if self._state.value == State.STARTED:
518 raise ProcessError("Already started server")
519 elif self._state.value == State.SHUTDOWN:
520 raise ProcessError("Manager has shut down")
521 else:
522 raise ProcessError(
523 "Unknown state {!r}".format(self._state.value))
524 return Server(self._registry, self._address,
525 self._authkey, self._serializer)
526
527 def connect(self):
528 '''
529 Connect manager object to the server process
530 '''
531 Listener, Client = listener_client[self._serializer]
532 conn = Client(self._address, authkey=self._authkey)
533 dispatch(conn, None, 'dummy')
534 self._state.value = State.STARTED
535
536 def start(self, initializer=None, initargs=()):
537 '''
538 Spawn a server process for this manager object
539 '''
540 if self._state.value != State.INITIAL:
541 if self._state.value == State.STARTED:
542 raise ProcessError("Already started server")
543 elif self._state.value == State.SHUTDOWN:
544 raise ProcessError("Manager has shut down")
545 else:
546 raise ProcessError(
547 "Unknown state {!r}".format(self._state.value))
548
549 if initializer is not None and not callable(initializer):
550 raise TypeError('initializer must be a callable')
551
552 # pipe over which we will retrieve address of server
553 reader, writer = connection.Pipe(duplex=False)
554
555 # spawn process which runs a server
556 self._process = self._ctx.Process(
557 target=type(self)._run_server,
558 args=(self._registry, self._address, self._authkey,
559 self._serializer, writer, initializer, initargs),
560 )
561 ident = ':'.join(str(i) for i in self._process._identity)
562 self._process.name = type(self).__name__ + '-' + ident
563 self._process.start()
564
565 # get address of server
566 writer.close()
567 self._address = reader.recv()
568 reader.close()
569
570 # register a finalizer
571 self._state.value = State.STARTED
572 self.shutdown = util.Finalize(
573 self, type(self)._finalize_manager,
574 args=(self._process, self._address, self._authkey, self._state,
575 self._Client, self._shutdown_timeout),
576 exitpriority=0
577 )
578
579 @classmethod
580 def _run_server(cls, registry, address, authkey, serializer, writer,
581 initializer=None, initargs=()):
582 '''
583 Create a server, report its address and run it
584 '''
585 # bpo-36368: protect server process from KeyboardInterrupt signals
586 signal.signal(signal.SIGINT, signal.SIG_IGN)
587
588 if initializer is not None:
589 initializer(*initargs)
590
591 # create server
592 server = cls._Server(registry, address, authkey, serializer)
593
594 # inform parent process of the server's address
595 writer.send(server.address)
596 writer.close()
597
598 # run the manager
599 util.info('manager serving at %r', server.address)
600 server.serve_forever()
601
602 def _create(self, typeid, /, *args, **kwds):
603 '''
604 Create a new shared object; return the token and exposed tuple
605 '''
606 assert self._state.value == State.STARTED, 'server not yet started'
607 conn = self._Client(self._address, authkey=self._authkey)
608 try:
609 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
610 finally:
611 conn.close()
612 return Token(typeid, self._address, id), exposed
613
614 def join(self, timeout=None):
615 '''
616 Join the manager process (if it has been spawned)
617 '''
618 if self._process is not None:
619 self._process.join(timeout)
620 if not self._process.is_alive():
621 self._process = None
622
623 def _debug_info(self):
624 '''
625 Return some info about the servers shared objects and connections
626 '''
627 conn = self._Client(self._address, authkey=self._authkey)
628 try:
629 return dispatch(conn, None, 'debug_info')
630 finally:
631 conn.close()
632
633 def _number_of_objects(self):
634 '''
635 Return the number of shared objects
636 '''
637 conn = self._Client(self._address, authkey=self._authkey)
638 try:
639 return dispatch(conn, None, 'number_of_objects')
640 finally:
641 conn.close()
642
643 def __enter__(self):
644 if self._state.value == State.INITIAL:
645 self.start()
646 if self._state.value != State.STARTED:
647 if self._state.value == State.INITIAL:
648 raise ProcessError("Unable to start server")
649 elif self._state.value == State.SHUTDOWN:
650 raise ProcessError("Manager has shut down")
651 else:
652 raise ProcessError(
653 "Unknown state {!r}".format(self._state.value))
654 return self
655
656 def __exit__(self, exc_type, exc_val, exc_tb):
657 self.shutdown()
658
659 @staticmethod
660 def _finalize_manager(process, address, authkey, state, _Client,
661 shutdown_timeout):
662 '''
663 Shutdown the manager process; will be registered as a finalizer
664 '''
665 if process.is_alive():
666 util.info('sending shutdown message to manager')
667 try:
668 conn = _Client(address, authkey=authkey)
669 try:
670 dispatch(conn, None, 'shutdown')
671 finally:
672 conn.close()
673 except Exception:
674 pass
675
676 process.join(timeout=shutdown_timeout)
677 if process.is_alive():
678 util.info('manager still alive')
679 if hasattr(process, 'terminate'):
680 util.info('trying to `terminate()` manager process')
681 process.terminate()
682 process.join(timeout=shutdown_timeout)
683 if process.is_alive():
684 util.info('manager still alive after terminate')
685 process.kill()
686 process.join()
687
688 state.value = State.SHUTDOWN
689 try:
690 del BaseProxy._address_to_local[address]
691 except KeyError:
692 pass
693
694 @property
695 def address(self):
696 return self._address
697
698 @classmethod
699 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
700 method_to_typeid=None, create_method=True):
701 '''
702 Register a typeid with the manager type
703 '''
704 if '_registry' not in cls.__dict__:
705 cls._registry = cls._registry.copy()
706
707 if proxytype is None:
708 proxytype = AutoProxy
709
710 exposed = exposed or getattr(proxytype, '_exposed_', None)
711
712 method_to_typeid = method_to_typeid or \
713 getattr(proxytype, '_method_to_typeid_', None)
714
715 if method_to_typeid:
716 for key, value in list(method_to_typeid.items()): # isinstance?
717 assert type(key) is str, '%r is not a string' % key
718 assert type(value) is str, '%r is not a string' % value
719
720 cls._registry[typeid] = (
721 callable, exposed, method_to_typeid, proxytype
722 )
723
724 if create_method:
725 def temp(self, /, *args, **kwds):
726 util.debug('requesting creation of a shared %r object', typeid)
727 token, exp = self._create(typeid, *args, **kwds)
728 proxy = proxytype(
729 token, self._serializer, manager=self,
730 authkey=self._authkey, exposed=exp
731 )
732 conn = self._Client(token.address, authkey=self._authkey)
733 dispatch(conn, None, 'decref', (token.id,))
734 return proxy
735 temp.__name__ = typeid
736 setattr(cls, typeid, temp)
737
738 #
739 # Subclass of set which get cleared after a fork
740 #
741
742 class ESC[4;38;5;81mProcessLocalSet(ESC[4;38;5;149mset):
743 def __init__(self):
744 util.register_after_fork(self, lambda obj: obj.clear())
745 def __reduce__(self):
746 return type(self), ()
747
748 #
749 # Definition of BaseProxy
750 #
751
752 class ESC[4;38;5;81mBaseProxy(ESC[4;38;5;149mobject):
753 '''
754 A base for proxies of shared objects
755 '''
756 _address_to_local = {}
757 _mutex = util.ForkAwareThreadLock()
758
759 def __init__(self, token, serializer, manager=None,
760 authkey=None, exposed=None, incref=True, manager_owned=False):
761 with BaseProxy._mutex:
762 tls_idset = BaseProxy._address_to_local.get(token.address, None)
763 if tls_idset is None:
764 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
765 BaseProxy._address_to_local[token.address] = tls_idset
766
767 # self._tls is used to record the connection used by this
768 # thread to communicate with the manager at token.address
769 self._tls = tls_idset[0]
770
771 # self._idset is used to record the identities of all shared
772 # objects for which the current process owns references and
773 # which are in the manager at token.address
774 self._idset = tls_idset[1]
775
776 self._token = token
777 self._id = self._token.id
778 self._manager = manager
779 self._serializer = serializer
780 self._Client = listener_client[serializer][1]
781
782 # Should be set to True only when a proxy object is being created
783 # on the manager server; primary use case: nested proxy objects.
784 # RebuildProxy detects when a proxy is being created on the manager
785 # and sets this value appropriately.
786 self._owned_by_manager = manager_owned
787
788 if authkey is not None:
789 self._authkey = process.AuthenticationString(authkey)
790 elif self._manager is not None:
791 self._authkey = self._manager._authkey
792 else:
793 self._authkey = process.current_process().authkey
794
795 if incref:
796 self._incref()
797
798 util.register_after_fork(self, BaseProxy._after_fork)
799
800 def _connect(self):
801 util.debug('making connection to manager')
802 name = process.current_process().name
803 if threading.current_thread().name != 'MainThread':
804 name += '|' + threading.current_thread().name
805 conn = self._Client(self._token.address, authkey=self._authkey)
806 dispatch(conn, None, 'accept_connection', (name,))
807 self._tls.connection = conn
808
809 def _callmethod(self, methodname, args=(), kwds={}):
810 '''
811 Try to call a method of the referent and return a copy of the result
812 '''
813 try:
814 conn = self._tls.connection
815 except AttributeError:
816 util.debug('thread %r does not own a connection',
817 threading.current_thread().name)
818 self._connect()
819 conn = self._tls.connection
820
821 conn.send((self._id, methodname, args, kwds))
822 kind, result = conn.recv()
823
824 if kind == '#RETURN':
825 return result
826 elif kind == '#PROXY':
827 exposed, token = result
828 proxytype = self._manager._registry[token.typeid][-1]
829 token.address = self._token.address
830 proxy = proxytype(
831 token, self._serializer, manager=self._manager,
832 authkey=self._authkey, exposed=exposed
833 )
834 conn = self._Client(token.address, authkey=self._authkey)
835 dispatch(conn, None, 'decref', (token.id,))
836 return proxy
837 raise convert_to_error(kind, result)
838
839 def _getvalue(self):
840 '''
841 Get a copy of the value of the referent
842 '''
843 return self._callmethod('#GETVALUE')
844
845 def _incref(self):
846 if self._owned_by_manager:
847 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
848 return
849
850 conn = self._Client(self._token.address, authkey=self._authkey)
851 dispatch(conn, None, 'incref', (self._id,))
852 util.debug('INCREF %r', self._token.id)
853
854 self._idset.add(self._id)
855
856 state = self._manager and self._manager._state
857
858 self._close = util.Finalize(
859 self, BaseProxy._decref,
860 args=(self._token, self._authkey, state,
861 self._tls, self._idset, self._Client),
862 exitpriority=10
863 )
864
865 @staticmethod
866 def _decref(token, authkey, state, tls, idset, _Client):
867 idset.discard(token.id)
868
869 # check whether manager is still alive
870 if state is None or state.value == State.STARTED:
871 # tell manager this process no longer cares about referent
872 try:
873 util.debug('DECREF %r', token.id)
874 conn = _Client(token.address, authkey=authkey)
875 dispatch(conn, None, 'decref', (token.id,))
876 except Exception as e:
877 util.debug('... decref failed %s', e)
878
879 else:
880 util.debug('DECREF %r -- manager already shutdown', token.id)
881
882 # check whether we can close this thread's connection because
883 # the process owns no more references to objects for this manager
884 if not idset and hasattr(tls, 'connection'):
885 util.debug('thread %r has no more proxies so closing conn',
886 threading.current_thread().name)
887 tls.connection.close()
888 del tls.connection
889
890 def _after_fork(self):
891 self._manager = None
892 try:
893 self._incref()
894 except Exception as e:
895 # the proxy may just be for a manager which has shutdown
896 util.info('incref failed: %s' % e)
897
898 def __reduce__(self):
899 kwds = {}
900 if get_spawning_popen() is not None:
901 kwds['authkey'] = self._authkey
902
903 if getattr(self, '_isauto', False):
904 kwds['exposed'] = self._exposed_
905 return (RebuildProxy,
906 (AutoProxy, self._token, self._serializer, kwds))
907 else:
908 return (RebuildProxy,
909 (type(self), self._token, self._serializer, kwds))
910
911 def __deepcopy__(self, memo):
912 return self._getvalue()
913
914 def __repr__(self):
915 return '<%s object, typeid %r at %#x>' % \
916 (type(self).__name__, self._token.typeid, id(self))
917
918 def __str__(self):
919 '''
920 Return representation of the referent (or a fall-back if that fails)
921 '''
922 try:
923 return self._callmethod('__repr__')
924 except Exception:
925 return repr(self)[:-1] + "; '__str__()' failed>"
926
927 #
928 # Function used for unpickling
929 #
930
931 def RebuildProxy(func, token, serializer, kwds):
932 '''
933 Function used for unpickling proxy objects.
934 '''
935 server = getattr(process.current_process(), '_manager_server', None)
936 if server and server.address == token.address:
937 util.debug('Rebuild a proxy owned by manager, token=%r', token)
938 kwds['manager_owned'] = True
939 if token.id not in server.id_to_local_proxy_obj:
940 server.id_to_local_proxy_obj[token.id] = \
941 server.id_to_obj[token.id]
942 incref = (
943 kwds.pop('incref', True) and
944 not getattr(process.current_process(), '_inheriting', False)
945 )
946 return func(token, serializer, incref=incref, **kwds)
947
948 #
949 # Functions to create proxies and proxy types
950 #
951
952 def MakeProxyType(name, exposed, _cache={}):
953 '''
954 Return a proxy type whose methods are given by `exposed`
955 '''
956 exposed = tuple(exposed)
957 try:
958 return _cache[(name, exposed)]
959 except KeyError:
960 pass
961
962 dic = {}
963
964 for meth in exposed:
965 exec('''def %s(self, /, *args, **kwds):
966 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
967
968 ProxyType = type(name, (BaseProxy,), dic)
969 ProxyType._exposed_ = exposed
970 _cache[(name, exposed)] = ProxyType
971 return ProxyType
972
973
974 def AutoProxy(token, serializer, manager=None, authkey=None,
975 exposed=None, incref=True, manager_owned=False):
976 '''
977 Return an auto-proxy for `token`
978 '''
979 _Client = listener_client[serializer][1]
980
981 if exposed is None:
982 conn = _Client(token.address, authkey=authkey)
983 try:
984 exposed = dispatch(conn, None, 'get_methods', (token,))
985 finally:
986 conn.close()
987
988 if authkey is None and manager is not None:
989 authkey = manager._authkey
990 if authkey is None:
991 authkey = process.current_process().authkey
992
993 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
994 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
995 incref=incref, manager_owned=manager_owned)
996 proxy._isauto = True
997 return proxy
998
999 #
1000 # Types/callables which we will register with SyncManager
1001 #
1002
1003 class ESC[4;38;5;81mNamespace(ESC[4;38;5;149mobject):
1004 def __init__(self, /, **kwds):
1005 self.__dict__.update(kwds)
1006 def __repr__(self):
1007 items = list(self.__dict__.items())
1008 temp = []
1009 for name, value in items:
1010 if not name.startswith('_'):
1011 temp.append('%s=%r' % (name, value))
1012 temp.sort()
1013 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
1014
1015 class ESC[4;38;5;81mValue(ESC[4;38;5;149mobject):
1016 def __init__(self, typecode, value, lock=True):
1017 self._typecode = typecode
1018 self._value = value
1019 def get(self):
1020 return self._value
1021 def set(self, value):
1022 self._value = value
1023 def __repr__(self):
1024 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1025 value = property(get, set)
1026
1027 def Array(typecode, sequence, lock=True):
1028 return array.array(typecode, sequence)
1029
1030 #
1031 # Proxy types used by SyncManager
1032 #
1033
1034 class ESC[4;38;5;81mIteratorProxy(ESC[4;38;5;149mBaseProxy):
1035 _exposed_ = ('__next__', 'send', 'throw', 'close')
1036 def __iter__(self):
1037 return self
1038 def __next__(self, *args):
1039 return self._callmethod('__next__', args)
1040 def send(self, *args):
1041 return self._callmethod('send', args)
1042 def throw(self, *args):
1043 return self._callmethod('throw', args)
1044 def close(self, *args):
1045 return self._callmethod('close', args)
1046
1047
1048 class ESC[4;38;5;81mAcquirerProxy(ESC[4;38;5;149mBaseProxy):
1049 _exposed_ = ('acquire', 'release')
1050 def acquire(self, blocking=True, timeout=None):
1051 args = (blocking,) if timeout is None else (blocking, timeout)
1052 return self._callmethod('acquire', args)
1053 def release(self):
1054 return self._callmethod('release')
1055 def __enter__(self):
1056 return self._callmethod('acquire')
1057 def __exit__(self, exc_type, exc_val, exc_tb):
1058 return self._callmethod('release')
1059
1060
1061 class ESC[4;38;5;81mConditionProxy(ESC[4;38;5;149mAcquirerProxy):
1062 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
1063 def wait(self, timeout=None):
1064 return self._callmethod('wait', (timeout,))
1065 def notify(self, n=1):
1066 return self._callmethod('notify', (n,))
1067 def notify_all(self):
1068 return self._callmethod('notify_all')
1069 def wait_for(self, predicate, timeout=None):
1070 result = predicate()
1071 if result:
1072 return result
1073 if timeout is not None:
1074 endtime = time.monotonic() + timeout
1075 else:
1076 endtime = None
1077 waittime = None
1078 while not result:
1079 if endtime is not None:
1080 waittime = endtime - time.monotonic()
1081 if waittime <= 0:
1082 break
1083 self.wait(waittime)
1084 result = predicate()
1085 return result
1086
1087
1088 class ESC[4;38;5;81mEventProxy(ESC[4;38;5;149mBaseProxy):
1089 _exposed_ = ('is_set', 'set', 'clear', 'wait')
1090 def is_set(self):
1091 return self._callmethod('is_set')
1092 def set(self):
1093 return self._callmethod('set')
1094 def clear(self):
1095 return self._callmethod('clear')
1096 def wait(self, timeout=None):
1097 return self._callmethod('wait', (timeout,))
1098
1099
1100 class ESC[4;38;5;81mBarrierProxy(ESC[4;38;5;149mBaseProxy):
1101 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1102 def wait(self, timeout=None):
1103 return self._callmethod('wait', (timeout,))
1104 def abort(self):
1105 return self._callmethod('abort')
1106 def reset(self):
1107 return self._callmethod('reset')
1108 @property
1109 def parties(self):
1110 return self._callmethod('__getattribute__', ('parties',))
1111 @property
1112 def n_waiting(self):
1113 return self._callmethod('__getattribute__', ('n_waiting',))
1114 @property
1115 def broken(self):
1116 return self._callmethod('__getattribute__', ('broken',))
1117
1118
1119 class ESC[4;38;5;81mNamespaceProxy(ESC[4;38;5;149mBaseProxy):
1120 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1121 def __getattr__(self, key):
1122 if key[0] == '_':
1123 return object.__getattribute__(self, key)
1124 callmethod = object.__getattribute__(self, '_callmethod')
1125 return callmethod('__getattribute__', (key,))
1126 def __setattr__(self, key, value):
1127 if key[0] == '_':
1128 return object.__setattr__(self, key, value)
1129 callmethod = object.__getattribute__(self, '_callmethod')
1130 return callmethod('__setattr__', (key, value))
1131 def __delattr__(self, key):
1132 if key[0] == '_':
1133 return object.__delattr__(self, key)
1134 callmethod = object.__getattribute__(self, '_callmethod')
1135 return callmethod('__delattr__', (key,))
1136
1137
1138 class ESC[4;38;5;81mValueProxy(ESC[4;38;5;149mBaseProxy):
1139 _exposed_ = ('get', 'set')
1140 def get(self):
1141 return self._callmethod('get')
1142 def set(self, value):
1143 return self._callmethod('set', (value,))
1144 value = property(get, set)
1145
1146 __class_getitem__ = classmethod(types.GenericAlias)
1147
1148
1149 BaseListProxy = MakeProxyType('BaseListProxy', (
1150 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1151 '__mul__', '__reversed__', '__rmul__', '__setitem__',
1152 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1153 'reverse', 'sort', '__imul__'
1154 ))
1155 class ESC[4;38;5;81mListProxy(ESC[4;38;5;149mBaseListProxy):
1156 def __iadd__(self, value):
1157 self._callmethod('extend', (value,))
1158 return self
1159 def __imul__(self, value):
1160 self._callmethod('__imul__', (value,))
1161 return self
1162
1163
1164 DictProxy = MakeProxyType('DictProxy', (
1165 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
1166 '__setitem__', 'clear', 'copy', 'get', 'items',
1167 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1168 ))
1169 DictProxy._method_to_typeid_ = {
1170 '__iter__': 'Iterator',
1171 }
1172
1173
1174 ArrayProxy = MakeProxyType('ArrayProxy', (
1175 '__len__', '__getitem__', '__setitem__'
1176 ))
1177
1178
1179 BasePoolProxy = MakeProxyType('PoolProxy', (
1180 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1181 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
1182 ))
1183 BasePoolProxy._method_to_typeid_ = {
1184 'apply_async': 'AsyncResult',
1185 'map_async': 'AsyncResult',
1186 'starmap_async': 'AsyncResult',
1187 'imap': 'Iterator',
1188 'imap_unordered': 'Iterator'
1189 }
1190 class ESC[4;38;5;81mPoolProxy(ESC[4;38;5;149mBasePoolProxy):
1191 def __enter__(self):
1192 return self
1193 def __exit__(self, exc_type, exc_val, exc_tb):
1194 self.terminate()
1195
1196 #
1197 # Definition of SyncManager
1198 #
1199
1200 class ESC[4;38;5;81mSyncManager(ESC[4;38;5;149mBaseManager):
1201 '''
1202 Subclass of `BaseManager` which supports a number of shared object types.
1203
1204 The types registered are those intended for the synchronization
1205 of threads, plus `dict`, `list` and `Namespace`.
1206
1207 The `multiprocessing.Manager()` function creates started instances of
1208 this class.
1209 '''
1210
1211 SyncManager.register('Queue', queue.Queue)
1212 SyncManager.register('JoinableQueue', queue.Queue)
1213 SyncManager.register('Event', threading.Event, EventProxy)
1214 SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1215 SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1216 SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1217 SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1218 AcquirerProxy)
1219 SyncManager.register('Condition', threading.Condition, ConditionProxy)
1220 SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
1221 SyncManager.register('Pool', pool.Pool, PoolProxy)
1222 SyncManager.register('list', list, ListProxy)
1223 SyncManager.register('dict', dict, DictProxy)
1224 SyncManager.register('Value', Value, ValueProxy)
1225 SyncManager.register('Array', Array, ArrayProxy)
1226 SyncManager.register('Namespace', Namespace, NamespaceProxy)
1227
1228 # types returned by methods of PoolProxy
1229 SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1230 SyncManager.register('AsyncResult', create_method=False)
1231
1232 #
1233 # Definition of SharedMemoryManager and SharedMemoryServer
1234 #
1235
1236 if HAS_SHMEM:
1237 class ESC[4;38;5;81m_SharedMemoryTracker:
1238 "Manages one or more shared memory segments."
1239
1240 def __init__(self, name, segment_names=[]):
1241 self.shared_memory_context_name = name
1242 self.segment_names = segment_names
1243
1244 def register_segment(self, segment_name):
1245 "Adds the supplied shared memory block name to tracker."
1246 util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1247 self.segment_names.append(segment_name)
1248
1249 def destroy_segment(self, segment_name):
1250 """Calls unlink() on the shared memory block with the supplied name
1251 and removes it from the list of blocks being tracked."""
1252 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1253 self.segment_names.remove(segment_name)
1254 segment = shared_memory.SharedMemory(segment_name)
1255 segment.close()
1256 segment.unlink()
1257
1258 def unlink(self):
1259 "Calls destroy_segment() on all tracked shared memory blocks."
1260 for segment_name in self.segment_names[:]:
1261 self.destroy_segment(segment_name)
1262
1263 def __del__(self):
1264 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1265 self.unlink()
1266
1267 def __getstate__(self):
1268 return (self.shared_memory_context_name, self.segment_names)
1269
1270 def __setstate__(self, state):
1271 self.__init__(*state)
1272
1273
1274 class ESC[4;38;5;81mSharedMemoryServer(ESC[4;38;5;149mServer):
1275
1276 public = Server.public + \
1277 ['track_segment', 'release_segment', 'list_segments']
1278
1279 def __init__(self, *args, **kwargs):
1280 Server.__init__(self, *args, **kwargs)
1281 address = self.address
1282 # The address of Linux abstract namespaces can be bytes
1283 if isinstance(address, bytes):
1284 address = os.fsdecode(address)
1285 self.shared_memory_context = \
1286 _SharedMemoryTracker(f"shm_{address}_{getpid()}")
1287 util.debug(f"SharedMemoryServer started by pid {getpid()}")
1288
1289 def create(self, c, typeid, /, *args, **kwargs):
1290 """Create a new distributed-shared object (not backed by a shared
1291 memory block) and return its id to be used in a Proxy Object."""
1292 # Unless set up as a shared proxy, don't make shared_memory_context
1293 # a standard part of kwargs. This makes things easier for supplying
1294 # simple functions.
1295 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1296 kwargs['shared_memory_context'] = self.shared_memory_context
1297 return Server.create(self, c, typeid, *args, **kwargs)
1298
1299 def shutdown(self, c):
1300 "Call unlink() on all tracked shared memory, terminate the Server."
1301 self.shared_memory_context.unlink()
1302 return Server.shutdown(self, c)
1303
1304 def track_segment(self, c, segment_name):
1305 "Adds the supplied shared memory block name to Server's tracker."
1306 self.shared_memory_context.register_segment(segment_name)
1307
1308 def release_segment(self, c, segment_name):
1309 """Calls unlink() on the shared memory block with the supplied name
1310 and removes it from the tracker instance inside the Server."""
1311 self.shared_memory_context.destroy_segment(segment_name)
1312
1313 def list_segments(self, c):
1314 """Returns a list of names of shared memory blocks that the Server
1315 is currently tracking."""
1316 return self.shared_memory_context.segment_names
1317
1318
1319 class ESC[4;38;5;81mSharedMemoryManager(ESC[4;38;5;149mBaseManager):
1320 """Like SyncManager but uses SharedMemoryServer instead of Server.
1321
1322 It provides methods for creating and returning SharedMemory instances
1323 and for creating a list-like object (ShareableList) backed by shared
1324 memory. It also provides methods that create and return Proxy Objects
1325 that support synchronization across processes (i.e. multi-process-safe
1326 locks and semaphores).
1327 """
1328
1329 _Server = SharedMemoryServer
1330
1331 def __init__(self, *args, **kwargs):
1332 if os.name == "posix":
1333 # bpo-36867: Ensure the resource_tracker is running before
1334 # launching the manager process, so that concurrent
1335 # shared_memory manipulation both in the manager and in the
1336 # current process does not create two resource_tracker
1337 # processes.
1338 from . import resource_tracker
1339 resource_tracker.ensure_running()
1340 BaseManager.__init__(self, *args, **kwargs)
1341 util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1342
1343 def __del__(self):
1344 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1345
1346 def get_server(self):
1347 'Better than monkeypatching for now; merge into Server ultimately'
1348 if self._state.value != State.INITIAL:
1349 if self._state.value == State.STARTED:
1350 raise ProcessError("Already started SharedMemoryServer")
1351 elif self._state.value == State.SHUTDOWN:
1352 raise ProcessError("SharedMemoryManager has shut down")
1353 else:
1354 raise ProcessError(
1355 "Unknown state {!r}".format(self._state.value))
1356 return self._Server(self._registry, self._address,
1357 self._authkey, self._serializer)
1358
1359 def SharedMemory(self, size):
1360 """Returns a new SharedMemory instance with the specified size in
1361 bytes, to be tracked by the manager."""
1362 with self._Client(self._address, authkey=self._authkey) as conn:
1363 sms = shared_memory.SharedMemory(None, create=True, size=size)
1364 try:
1365 dispatch(conn, None, 'track_segment', (sms.name,))
1366 except BaseException as e:
1367 sms.unlink()
1368 raise e
1369 return sms
1370
1371 def ShareableList(self, sequence):
1372 """Returns a new ShareableList instance populated with the values
1373 from the input sequence, to be tracked by the manager."""
1374 with self._Client(self._address, authkey=self._authkey) as conn:
1375 sl = shared_memory.ShareableList(sequence)
1376 try:
1377 dispatch(conn, None, 'track_segment', (sl.shm.name,))
1378 except BaseException as e:
1379 sl.shm.unlink()
1380 raise e
1381 return sl