(root)/
Python-3.12.0/
Lib/
multiprocessing/
managers.py
       1  #
       2  # Module providing manager classes for dealing
       3  # with shared objects
       4  #
       5  # multiprocessing/managers.py
       6  #
       7  # Copyright (c) 2006-2008, R Oudkerk
       8  # Licensed to PSF under a Contributor Agreement.
       9  #
      10  
      11  __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
      12  
      13  #
      14  # Imports
      15  #
      16  
      17  import sys
      18  import threading
      19  import signal
      20  import array
      21  import queue
      22  import time
      23  import types
      24  import os
      25  from os import getpid
      26  
      27  from traceback import format_exc
      28  
      29  from . import connection
      30  from .context import reduction, get_spawning_popen, ProcessError
      31  from . import pool
      32  from . import process
      33  from . import util
      34  from . import get_context
      35  try:
      36      from . import shared_memory
      37  except ImportError:
      38      HAS_SHMEM = False
      39  else:
      40      HAS_SHMEM = True
      41      __all__.append('SharedMemoryManager')
      42  
      43  #
      44  # Register some things for pickling
      45  #
      46  
      47  def reduce_array(a):
      48      return array.array, (a.typecode, a.tobytes())
      49  reduction.register(array.array, reduce_array)
      50  
      51  view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
      52  def rebuild_as_list(obj):
      53      return list, (list(obj),)
      54  for view_type in view_types:
      55      reduction.register(view_type, rebuild_as_list)
      56  del view_type, view_types
      57  
      58  #
      59  # Type for identifying shared objects
      60  #
      61  
      62  class ESC[4;38;5;81mToken(ESC[4;38;5;149mobject):
      63      '''
      64      Type to uniquely identify a shared object
      65      '''
      66      __slots__ = ('typeid', 'address', 'id')
      67  
      68      def __init__(self, typeid, address, id):
      69          (self.typeid, self.address, self.id) = (typeid, address, id)
      70  
      71      def __getstate__(self):
      72          return (self.typeid, self.address, self.id)
      73  
      74      def __setstate__(self, state):
      75          (self.typeid, self.address, self.id) = state
      76  
      77      def __repr__(self):
      78          return '%s(typeid=%r, address=%r, id=%r)' % \
      79                 (self.__class__.__name__, self.typeid, self.address, self.id)
      80  
      81  #
      82  # Function for communication with a manager's server process
      83  #
      84  
      85  def dispatch(c, id, methodname, args=(), kwds={}):
      86      '''
      87      Send a message to manager using connection `c` and return response
      88      '''
      89      c.send((id, methodname, args, kwds))
      90      kind, result = c.recv()
      91      if kind == '#RETURN':
      92          return result
      93      raise convert_to_error(kind, result)
      94  
      95  def convert_to_error(kind, result):
      96      if kind == '#ERROR':
      97          return result
      98      elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
      99          if not isinstance(result, str):
     100              raise TypeError(
     101                  "Result {0!r} (kind '{1}') type is {2}, not str".format(
     102                      result, kind, type(result)))
     103          if kind == '#UNSERIALIZABLE':
     104              return RemoteError('Unserializable message: %s\n' % result)
     105          else:
     106              return RemoteError(result)
     107      else:
     108          return ValueError('Unrecognized message type {!r}'.format(kind))
     109  
     110  class ESC[4;38;5;81mRemoteError(ESC[4;38;5;149mException):
     111      def __str__(self):
     112          return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
     113  
     114  #
     115  # Functions for finding the method names of an object
     116  #
     117  
     118  def all_methods(obj):
     119      '''
     120      Return a list of names of methods of `obj`
     121      '''
     122      temp = []
     123      for name in dir(obj):
     124          func = getattr(obj, name)
     125          if callable(func):
     126              temp.append(name)
     127      return temp
     128  
     129  def public_methods(obj):
     130      '''
     131      Return a list of names of methods of `obj` which do not start with '_'
     132      '''
     133      return [name for name in all_methods(obj) if name[0] != '_']
     134  
     135  #
     136  # Server which is run in a process controlled by a manager
     137  #
     138  
     139  class ESC[4;38;5;81mServer(ESC[4;38;5;149mobject):
     140      '''
     141      Server class which runs in a process controlled by a manager object
     142      '''
     143      public = ['shutdown', 'create', 'accept_connection', 'get_methods',
     144                'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
     145  
     146      def __init__(self, registry, address, authkey, serializer):
     147          if not isinstance(authkey, bytes):
     148              raise TypeError(
     149                  "Authkey {0!r} is type {1!s}, not bytes".format(
     150                      authkey, type(authkey)))
     151          self.registry = registry
     152          self.authkey = process.AuthenticationString(authkey)
     153          Listener, Client = listener_client[serializer]
     154  
     155          # do authentication later
     156          self.listener = Listener(address=address, backlog=16)
     157          self.address = self.listener.address
     158  
     159          self.id_to_obj = {'0': (None, ())}
     160          self.id_to_refcount = {}
     161          self.id_to_local_proxy_obj = {}
     162          self.mutex = threading.Lock()
     163  
     164      def serve_forever(self):
     165          '''
     166          Run the server forever
     167          '''
     168          self.stop_event = threading.Event()
     169          process.current_process()._manager_server = self
     170          try:
     171              accepter = threading.Thread(target=self.accepter)
     172              accepter.daemon = True
     173              accepter.start()
     174              try:
     175                  while not self.stop_event.is_set():
     176                      self.stop_event.wait(1)
     177              except (KeyboardInterrupt, SystemExit):
     178                  pass
     179          finally:
     180              if sys.stdout != sys.__stdout__: # what about stderr?
     181                  util.debug('resetting stdout, stderr')
     182                  sys.stdout = sys.__stdout__
     183                  sys.stderr = sys.__stderr__
     184              sys.exit(0)
     185  
     186      def accepter(self):
     187          while True:
     188              try:
     189                  c = self.listener.accept()
     190              except OSError:
     191                  continue
     192              t = threading.Thread(target=self.handle_request, args=(c,))
     193              t.daemon = True
     194              t.start()
     195  
     196      def _handle_request(self, c):
     197          request = None
     198          try:
     199              connection.deliver_challenge(c, self.authkey)
     200              connection.answer_challenge(c, self.authkey)
     201              request = c.recv()
     202              ignore, funcname, args, kwds = request
     203              assert funcname in self.public, '%r unrecognized' % funcname
     204              func = getattr(self, funcname)
     205          except Exception:
     206              msg = ('#TRACEBACK', format_exc())
     207          else:
     208              try:
     209                  result = func(c, *args, **kwds)
     210              except Exception:
     211                  msg = ('#TRACEBACK', format_exc())
     212              else:
     213                  msg = ('#RETURN', result)
     214  
     215          try:
     216              c.send(msg)
     217          except Exception as e:
     218              try:
     219                  c.send(('#TRACEBACK', format_exc()))
     220              except Exception:
     221                  pass
     222              util.info('Failure to send message: %r', msg)
     223              util.info(' ... request was %r', request)
     224              util.info(' ... exception was %r', e)
     225  
     226      def handle_request(self, conn):
     227          '''
     228          Handle a new connection
     229          '''
     230          try:
     231              self._handle_request(conn)
     232          except SystemExit:
     233              # Server.serve_client() calls sys.exit(0) on EOF
     234              pass
     235          finally:
     236              conn.close()
     237  
     238      def serve_client(self, conn):
     239          '''
     240          Handle requests from the proxies in a particular process/thread
     241          '''
     242          util.debug('starting server thread to service %r',
     243                     threading.current_thread().name)
     244  
     245          recv = conn.recv
     246          send = conn.send
     247          id_to_obj = self.id_to_obj
     248  
     249          while not self.stop_event.is_set():
     250  
     251              try:
     252                  methodname = obj = None
     253                  request = recv()
     254                  ident, methodname, args, kwds = request
     255                  try:
     256                      obj, exposed, gettypeid = id_to_obj[ident]
     257                  except KeyError as ke:
     258                      try:
     259                          obj, exposed, gettypeid = \
     260                              self.id_to_local_proxy_obj[ident]
     261                      except KeyError:
     262                          raise ke
     263  
     264                  if methodname not in exposed:
     265                      raise AttributeError(
     266                          'method %r of %r object is not in exposed=%r' %
     267                          (methodname, type(obj), exposed)
     268                          )
     269  
     270                  function = getattr(obj, methodname)
     271  
     272                  try:
     273                      res = function(*args, **kwds)
     274                  except Exception as e:
     275                      msg = ('#ERROR', e)
     276                  else:
     277                      typeid = gettypeid and gettypeid.get(methodname, None)
     278                      if typeid:
     279                          rident, rexposed = self.create(conn, typeid, res)
     280                          token = Token(typeid, self.address, rident)
     281                          msg = ('#PROXY', (rexposed, token))
     282                      else:
     283                          msg = ('#RETURN', res)
     284  
     285              except AttributeError:
     286                  if methodname is None:
     287                      msg = ('#TRACEBACK', format_exc())
     288                  else:
     289                      try:
     290                          fallback_func = self.fallback_mapping[methodname]
     291                          result = fallback_func(
     292                              self, conn, ident, obj, *args, **kwds
     293                              )
     294                          msg = ('#RETURN', result)
     295                      except Exception:
     296                          msg = ('#TRACEBACK', format_exc())
     297  
     298              except EOFError:
     299                  util.debug('got EOF -- exiting thread serving %r',
     300                             threading.current_thread().name)
     301                  sys.exit(0)
     302  
     303              except Exception:
     304                  msg = ('#TRACEBACK', format_exc())
     305  
     306              try:
     307                  try:
     308                      send(msg)
     309                  except Exception:
     310                      send(('#UNSERIALIZABLE', format_exc()))
     311              except Exception as e:
     312                  util.info('exception in thread serving %r',
     313                          threading.current_thread().name)
     314                  util.info(' ... message was %r', msg)
     315                  util.info(' ... exception was %r', e)
     316                  conn.close()
     317                  sys.exit(1)
     318  
     319      def fallback_getvalue(self, conn, ident, obj):
     320          return obj
     321  
     322      def fallback_str(self, conn, ident, obj):
     323          return str(obj)
     324  
     325      def fallback_repr(self, conn, ident, obj):
     326          return repr(obj)
     327  
     328      fallback_mapping = {
     329          '__str__':fallback_str,
     330          '__repr__':fallback_repr,
     331          '#GETVALUE':fallback_getvalue
     332          }
     333  
     334      def dummy(self, c):
     335          pass
     336  
     337      def debug_info(self, c):
     338          '''
     339          Return some info --- useful to spot problems with refcounting
     340          '''
     341          # Perhaps include debug info about 'c'?
     342          with self.mutex:
     343              result = []
     344              keys = list(self.id_to_refcount.keys())
     345              keys.sort()
     346              for ident in keys:
     347                  if ident != '0':
     348                      result.append('  %s:       refcount=%s\n    %s' %
     349                                    (ident, self.id_to_refcount[ident],
     350                                     str(self.id_to_obj[ident][0])[:75]))
     351              return '\n'.join(result)
     352  
     353      def number_of_objects(self, c):
     354          '''
     355          Number of shared objects
     356          '''
     357          # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
     358          return len(self.id_to_refcount)
     359  
     360      def shutdown(self, c):
     361          '''
     362          Shutdown this process
     363          '''
     364          try:
     365              util.debug('manager received shutdown message')
     366              c.send(('#RETURN', None))
     367          except:
     368              import traceback
     369              traceback.print_exc()
     370          finally:
     371              self.stop_event.set()
     372  
     373      def create(self, c, typeid, /, *args, **kwds):
     374          '''
     375          Create a new shared object and return its id
     376          '''
     377          with self.mutex:
     378              callable, exposed, method_to_typeid, proxytype = \
     379                        self.registry[typeid]
     380  
     381              if callable is None:
     382                  if kwds or (len(args) != 1):
     383                      raise ValueError(
     384                          "Without callable, must have one non-keyword argument")
     385                  obj = args[0]
     386              else:
     387                  obj = callable(*args, **kwds)
     388  
     389              if exposed is None:
     390                  exposed = public_methods(obj)
     391              if method_to_typeid is not None:
     392                  if not isinstance(method_to_typeid, dict):
     393                      raise TypeError(
     394                          "Method_to_typeid {0!r}: type {1!s}, not dict".format(
     395                              method_to_typeid, type(method_to_typeid)))
     396                  exposed = list(exposed) + list(method_to_typeid)
     397  
     398              ident = '%x' % id(obj)  # convert to string because xmlrpclib
     399                                      # only has 32 bit signed integers
     400              util.debug('%r callable returned object with id %r', typeid, ident)
     401  
     402              self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
     403              if ident not in self.id_to_refcount:
     404                  self.id_to_refcount[ident] = 0
     405  
     406          self.incref(c, ident)
     407          return ident, tuple(exposed)
     408  
     409      def get_methods(self, c, token):
     410          '''
     411          Return the methods of the shared object indicated by token
     412          '''
     413          return tuple(self.id_to_obj[token.id][1])
     414  
     415      def accept_connection(self, c, name):
     416          '''
     417          Spawn a new thread to serve this connection
     418          '''
     419          threading.current_thread().name = name
     420          c.send(('#RETURN', None))
     421          self.serve_client(c)
     422  
     423      def incref(self, c, ident):
     424          with self.mutex:
     425              try:
     426                  self.id_to_refcount[ident] += 1
     427              except KeyError as ke:
     428                  # If no external references exist but an internal (to the
     429                  # manager) still does and a new external reference is created
     430                  # from it, restore the manager's tracking of it from the
     431                  # previously stashed internal ref.
     432                  if ident in self.id_to_local_proxy_obj:
     433                      self.id_to_refcount[ident] = 1
     434                      self.id_to_obj[ident] = \
     435                          self.id_to_local_proxy_obj[ident]
     436                      util.debug('Server re-enabled tracking & INCREF %r', ident)
     437                  else:
     438                      raise ke
     439  
     440      def decref(self, c, ident):
     441          if ident not in self.id_to_refcount and \
     442              ident in self.id_to_local_proxy_obj:
     443              util.debug('Server DECREF skipping %r', ident)
     444              return
     445  
     446          with self.mutex:
     447              if self.id_to_refcount[ident] <= 0:
     448                  raise AssertionError(
     449                      "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
     450                          ident, self.id_to_obj[ident],
     451                          self.id_to_refcount[ident]))
     452              self.id_to_refcount[ident] -= 1
     453              if self.id_to_refcount[ident] == 0:
     454                  del self.id_to_refcount[ident]
     455  
     456          if ident not in self.id_to_refcount:
     457              # Two-step process in case the object turns out to contain other
     458              # proxy objects (e.g. a managed list of managed lists).
     459              # Otherwise, deleting self.id_to_obj[ident] would trigger the
     460              # deleting of the stored value (another managed object) which would
     461              # in turn attempt to acquire the mutex that is already held here.
     462              self.id_to_obj[ident] = (None, (), None)  # thread-safe
     463              util.debug('disposing of obj with id %r', ident)
     464              with self.mutex:
     465                  del self.id_to_obj[ident]
     466  
     467  
     468  #
     469  # Class to represent state of a manager
     470  #
     471  
     472  class ESC[4;38;5;81mState(ESC[4;38;5;149mobject):
     473      __slots__ = ['value']
     474      INITIAL = 0
     475      STARTED = 1
     476      SHUTDOWN = 2
     477  
     478  #
     479  # Mapping from serializer name to Listener and Client types
     480  #
     481  
     482  listener_client = {
     483      'pickle' : (connection.Listener, connection.Client),
     484      'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
     485      }
     486  
     487  #
     488  # Definition of BaseManager
     489  #
     490  
     491  class ESC[4;38;5;81mBaseManager(ESC[4;38;5;149mobject):
     492      '''
     493      Base class for managers
     494      '''
     495      _registry = {}
     496      _Server = Server
     497  
     498      def __init__(self, address=None, authkey=None, serializer='pickle',
     499                   ctx=None, *, shutdown_timeout=1.0):
     500          if authkey is None:
     501              authkey = process.current_process().authkey
     502          self._address = address     # XXX not final address if eg ('', 0)
     503          self._authkey = process.AuthenticationString(authkey)
     504          self._state = State()
     505          self._state.value = State.INITIAL
     506          self._serializer = serializer
     507          self._Listener, self._Client = listener_client[serializer]
     508          self._ctx = ctx or get_context()
     509          self._shutdown_timeout = shutdown_timeout
     510  
     511      def get_server(self):
     512          '''
     513          Return server object with serve_forever() method and address attribute
     514          '''
     515          if self._state.value != State.INITIAL:
     516              if self._state.value == State.STARTED:
     517                  raise ProcessError("Already started server")
     518              elif self._state.value == State.SHUTDOWN:
     519                  raise ProcessError("Manager has shut down")
     520              else:
     521                  raise ProcessError(
     522                      "Unknown state {!r}".format(self._state.value))
     523          return Server(self._registry, self._address,
     524                        self._authkey, self._serializer)
     525  
     526      def connect(self):
     527          '''
     528          Connect manager object to the server process
     529          '''
     530          Listener, Client = listener_client[self._serializer]
     531          conn = Client(self._address, authkey=self._authkey)
     532          dispatch(conn, None, 'dummy')
     533          self._state.value = State.STARTED
     534  
     535      def start(self, initializer=None, initargs=()):
     536          '''
     537          Spawn a server process for this manager object
     538          '''
     539          if self._state.value != State.INITIAL:
     540              if self._state.value == State.STARTED:
     541                  raise ProcessError("Already started server")
     542              elif self._state.value == State.SHUTDOWN:
     543                  raise ProcessError("Manager has shut down")
     544              else:
     545                  raise ProcessError(
     546                      "Unknown state {!r}".format(self._state.value))
     547  
     548          if initializer is not None and not callable(initializer):
     549              raise TypeError('initializer must be a callable')
     550  
     551          # pipe over which we will retrieve address of server
     552          reader, writer = connection.Pipe(duplex=False)
     553  
     554          # spawn process which runs a server
     555          self._process = self._ctx.Process(
     556              target=type(self)._run_server,
     557              args=(self._registry, self._address, self._authkey,
     558                    self._serializer, writer, initializer, initargs),
     559              )
     560          ident = ':'.join(str(i) for i in self._process._identity)
     561          self._process.name = type(self).__name__  + '-' + ident
     562          self._process.start()
     563  
     564          # get address of server
     565          writer.close()
     566          self._address = reader.recv()
     567          reader.close()
     568  
     569          # register a finalizer
     570          self._state.value = State.STARTED
     571          self.shutdown = util.Finalize(
     572              self, type(self)._finalize_manager,
     573              args=(self._process, self._address, self._authkey, self._state,
     574                    self._Client, self._shutdown_timeout),
     575              exitpriority=0
     576              )
     577  
     578      @classmethod
     579      def _run_server(cls, registry, address, authkey, serializer, writer,
     580                      initializer=None, initargs=()):
     581          '''
     582          Create a server, report its address and run it
     583          '''
     584          # bpo-36368: protect server process from KeyboardInterrupt signals
     585          signal.signal(signal.SIGINT, signal.SIG_IGN)
     586  
     587          if initializer is not None:
     588              initializer(*initargs)
     589  
     590          # create server
     591          server = cls._Server(registry, address, authkey, serializer)
     592  
     593          # inform parent process of the server's address
     594          writer.send(server.address)
     595          writer.close()
     596  
     597          # run the manager
     598          util.info('manager serving at %r', server.address)
     599          server.serve_forever()
     600  
     601      def _create(self, typeid, /, *args, **kwds):
     602          '''
     603          Create a new shared object; return the token and exposed tuple
     604          '''
     605          assert self._state.value == State.STARTED, 'server not yet started'
     606          conn = self._Client(self._address, authkey=self._authkey)
     607          try:
     608              id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
     609          finally:
     610              conn.close()
     611          return Token(typeid, self._address, id), exposed
     612  
     613      def join(self, timeout=None):
     614          '''
     615          Join the manager process (if it has been spawned)
     616          '''
     617          if self._process is not None:
     618              self._process.join(timeout)
     619              if not self._process.is_alive():
     620                  self._process = None
     621  
     622      def _debug_info(self):
     623          '''
     624          Return some info about the servers shared objects and connections
     625          '''
     626          conn = self._Client(self._address, authkey=self._authkey)
     627          try:
     628              return dispatch(conn, None, 'debug_info')
     629          finally:
     630              conn.close()
     631  
     632      def _number_of_objects(self):
     633          '''
     634          Return the number of shared objects
     635          '''
     636          conn = self._Client(self._address, authkey=self._authkey)
     637          try:
     638              return dispatch(conn, None, 'number_of_objects')
     639          finally:
     640              conn.close()
     641  
     642      def __enter__(self):
     643          if self._state.value == State.INITIAL:
     644              self.start()
     645          if self._state.value != State.STARTED:
     646              if self._state.value == State.INITIAL:
     647                  raise ProcessError("Unable to start server")
     648              elif self._state.value == State.SHUTDOWN:
     649                  raise ProcessError("Manager has shut down")
     650              else:
     651                  raise ProcessError(
     652                      "Unknown state {!r}".format(self._state.value))
     653          return self
     654  
     655      def __exit__(self, exc_type, exc_val, exc_tb):
     656          self.shutdown()
     657  
     658      @staticmethod
     659      def _finalize_manager(process, address, authkey, state, _Client,
     660                            shutdown_timeout):
     661          '''
     662          Shutdown the manager process; will be registered as a finalizer
     663          '''
     664          if process.is_alive():
     665              util.info('sending shutdown message to manager')
     666              try:
     667                  conn = _Client(address, authkey=authkey)
     668                  try:
     669                      dispatch(conn, None, 'shutdown')
     670                  finally:
     671                      conn.close()
     672              except Exception:
     673                  pass
     674  
     675              process.join(timeout=shutdown_timeout)
     676              if process.is_alive():
     677                  util.info('manager still alive')
     678                  if hasattr(process, 'terminate'):
     679                      util.info('trying to `terminate()` manager process')
     680                      process.terminate()
     681                      process.join(timeout=shutdown_timeout)
     682                      if process.is_alive():
     683                          util.info('manager still alive after terminate')
     684                          process.kill()
     685                          process.join()
     686  
     687          state.value = State.SHUTDOWN
     688          try:
     689              del BaseProxy._address_to_local[address]
     690          except KeyError:
     691              pass
     692  
     693      @property
     694      def address(self):
     695          return self._address
     696  
     697      @classmethod
     698      def register(cls, typeid, callable=None, proxytype=None, exposed=None,
     699                   method_to_typeid=None, create_method=True):
     700          '''
     701          Register a typeid with the manager type
     702          '''
     703          if '_registry' not in cls.__dict__:
     704              cls._registry = cls._registry.copy()
     705  
     706          if proxytype is None:
     707              proxytype = AutoProxy
     708  
     709          exposed = exposed or getattr(proxytype, '_exposed_', None)
     710  
     711          method_to_typeid = method_to_typeid or \
     712                             getattr(proxytype, '_method_to_typeid_', None)
     713  
     714          if method_to_typeid:
     715              for key, value in list(method_to_typeid.items()): # isinstance?
     716                  assert type(key) is str, '%r is not a string' % key
     717                  assert type(value) is str, '%r is not a string' % value
     718  
     719          cls._registry[typeid] = (
     720              callable, exposed, method_to_typeid, proxytype
     721              )
     722  
     723          if create_method:
     724              def temp(self, /, *args, **kwds):
     725                  util.debug('requesting creation of a shared %r object', typeid)
     726                  token, exp = self._create(typeid, *args, **kwds)
     727                  proxy = proxytype(
     728                      token, self._serializer, manager=self,
     729                      authkey=self._authkey, exposed=exp
     730                      )
     731                  conn = self._Client(token.address, authkey=self._authkey)
     732                  dispatch(conn, None, 'decref', (token.id,))
     733                  return proxy
     734              temp.__name__ = typeid
     735              setattr(cls, typeid, temp)
     736  
     737  #
     738  # Subclass of set which get cleared after a fork
     739  #
     740  
     741  class ESC[4;38;5;81mProcessLocalSet(ESC[4;38;5;149mset):
     742      def __init__(self):
     743          util.register_after_fork(self, lambda obj: obj.clear())
     744      def __reduce__(self):
     745          return type(self), ()
     746  
     747  #
     748  # Definition of BaseProxy
     749  #
     750  
     751  class ESC[4;38;5;81mBaseProxy(ESC[4;38;5;149mobject):
     752      '''
     753      A base for proxies of shared objects
     754      '''
     755      _address_to_local = {}
     756      _mutex = util.ForkAwareThreadLock()
     757  
     758      def __init__(self, token, serializer, manager=None,
     759                   authkey=None, exposed=None, incref=True, manager_owned=False):
     760          with BaseProxy._mutex:
     761              tls_idset = BaseProxy._address_to_local.get(token.address, None)
     762              if tls_idset is None:
     763                  tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
     764                  BaseProxy._address_to_local[token.address] = tls_idset
     765  
     766          # self._tls is used to record the connection used by this
     767          # thread to communicate with the manager at token.address
     768          self._tls = tls_idset[0]
     769  
     770          # self._idset is used to record the identities of all shared
     771          # objects for which the current process owns references and
     772          # which are in the manager at token.address
     773          self._idset = tls_idset[1]
     774  
     775          self._token = token
     776          self._id = self._token.id
     777          self._manager = manager
     778          self._serializer = serializer
     779          self._Client = listener_client[serializer][1]
     780  
     781          # Should be set to True only when a proxy object is being created
     782          # on the manager server; primary use case: nested proxy objects.
     783          # RebuildProxy detects when a proxy is being created on the manager
     784          # and sets this value appropriately.
     785          self._owned_by_manager = manager_owned
     786  
     787          if authkey is not None:
     788              self._authkey = process.AuthenticationString(authkey)
     789          elif self._manager is not None:
     790              self._authkey = self._manager._authkey
     791          else:
     792              self._authkey = process.current_process().authkey
     793  
     794          if incref:
     795              self._incref()
     796  
     797          util.register_after_fork(self, BaseProxy._after_fork)
     798  
     799      def _connect(self):
     800          util.debug('making connection to manager')
     801          name = process.current_process().name
     802          if threading.current_thread().name != 'MainThread':
     803              name += '|' + threading.current_thread().name
     804          conn = self._Client(self._token.address, authkey=self._authkey)
     805          dispatch(conn, None, 'accept_connection', (name,))
     806          self._tls.connection = conn
     807  
     808      def _callmethod(self, methodname, args=(), kwds={}):
     809          '''
     810          Try to call a method of the referent and return a copy of the result
     811          '''
     812          try:
     813              conn = self._tls.connection
     814          except AttributeError:
     815              util.debug('thread %r does not own a connection',
     816                         threading.current_thread().name)
     817              self._connect()
     818              conn = self._tls.connection
     819  
     820          conn.send((self._id, methodname, args, kwds))
     821          kind, result = conn.recv()
     822  
     823          if kind == '#RETURN':
     824              return result
     825          elif kind == '#PROXY':
     826              exposed, token = result
     827              proxytype = self._manager._registry[token.typeid][-1]
     828              token.address = self._token.address
     829              proxy = proxytype(
     830                  token, self._serializer, manager=self._manager,
     831                  authkey=self._authkey, exposed=exposed
     832                  )
     833              conn = self._Client(token.address, authkey=self._authkey)
     834              dispatch(conn, None, 'decref', (token.id,))
     835              return proxy
     836          raise convert_to_error(kind, result)
     837  
     838      def _getvalue(self):
     839          '''
     840          Get a copy of the value of the referent
     841          '''
     842          return self._callmethod('#GETVALUE')
     843  
     844      def _incref(self):
     845          if self._owned_by_manager:
     846              util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
     847              return
     848  
     849          conn = self._Client(self._token.address, authkey=self._authkey)
     850          dispatch(conn, None, 'incref', (self._id,))
     851          util.debug('INCREF %r', self._token.id)
     852  
     853          self._idset.add(self._id)
     854  
     855          state = self._manager and self._manager._state
     856  
     857          self._close = util.Finalize(
     858              self, BaseProxy._decref,
     859              args=(self._token, self._authkey, state,
     860                    self._tls, self._idset, self._Client),
     861              exitpriority=10
     862              )
     863  
     864      @staticmethod
     865      def _decref(token, authkey, state, tls, idset, _Client):
     866          idset.discard(token.id)
     867  
     868          # check whether manager is still alive
     869          if state is None or state.value == State.STARTED:
     870              # tell manager this process no longer cares about referent
     871              try:
     872                  util.debug('DECREF %r', token.id)
     873                  conn = _Client(token.address, authkey=authkey)
     874                  dispatch(conn, None, 'decref', (token.id,))
     875              except Exception as e:
     876                  util.debug('... decref failed %s', e)
     877  
     878          else:
     879              util.debug('DECREF %r -- manager already shutdown', token.id)
     880  
     881          # check whether we can close this thread's connection because
     882          # the process owns no more references to objects for this manager
     883          if not idset and hasattr(tls, 'connection'):
     884              util.debug('thread %r has no more proxies so closing conn',
     885                         threading.current_thread().name)
     886              tls.connection.close()
     887              del tls.connection
     888  
     889      def _after_fork(self):
     890          self._manager = None
     891          try:
     892              self._incref()
     893          except Exception as e:
     894              # the proxy may just be for a manager which has shutdown
     895              util.info('incref failed: %s' % e)
     896  
     897      def __reduce__(self):
     898          kwds = {}
     899          if get_spawning_popen() is not None:
     900              kwds['authkey'] = self._authkey
     901  
     902          if getattr(self, '_isauto', False):
     903              kwds['exposed'] = self._exposed_
     904              return (RebuildProxy,
     905                      (AutoProxy, self._token, self._serializer, kwds))
     906          else:
     907              return (RebuildProxy,
     908                      (type(self), self._token, self._serializer, kwds))
     909  
     910      def __deepcopy__(self, memo):
     911          return self._getvalue()
     912  
     913      def __repr__(self):
     914          return '<%s object, typeid %r at %#x>' % \
     915                 (type(self).__name__, self._token.typeid, id(self))
     916  
     917      def __str__(self):
     918          '''
     919          Return representation of the referent (or a fall-back if that fails)
     920          '''
     921          try:
     922              return self._callmethod('__repr__')
     923          except Exception:
     924              return repr(self)[:-1] + "; '__str__()' failed>"
     925  
     926  #
     927  # Function used for unpickling
     928  #
     929  
     930  def RebuildProxy(func, token, serializer, kwds):
     931      '''
     932      Function used for unpickling proxy objects.
     933      '''
     934      server = getattr(process.current_process(), '_manager_server', None)
     935      if server and server.address == token.address:
     936          util.debug('Rebuild a proxy owned by manager, token=%r', token)
     937          kwds['manager_owned'] = True
     938          if token.id not in server.id_to_local_proxy_obj:
     939              server.id_to_local_proxy_obj[token.id] = \
     940                  server.id_to_obj[token.id]
     941      incref = (
     942          kwds.pop('incref', True) and
     943          not getattr(process.current_process(), '_inheriting', False)
     944          )
     945      return func(token, serializer, incref=incref, **kwds)
     946  
     947  #
     948  # Functions to create proxies and proxy types
     949  #
     950  
     951  def MakeProxyType(name, exposed, _cache={}):
     952      '''
     953      Return a proxy type whose methods are given by `exposed`
     954      '''
     955      exposed = tuple(exposed)
     956      try:
     957          return _cache[(name, exposed)]
     958      except KeyError:
     959          pass
     960  
     961      dic = {}
     962  
     963      for meth in exposed:
     964          exec('''def %s(self, /, *args, **kwds):
     965          return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
     966  
     967      ProxyType = type(name, (BaseProxy,), dic)
     968      ProxyType._exposed_ = exposed
     969      _cache[(name, exposed)] = ProxyType
     970      return ProxyType
     971  
     972  
     973  def AutoProxy(token, serializer, manager=None, authkey=None,
     974                exposed=None, incref=True, manager_owned=False):
     975      '''
     976      Return an auto-proxy for `token`
     977      '''
     978      _Client = listener_client[serializer][1]
     979  
     980      if exposed is None:
     981          conn = _Client(token.address, authkey=authkey)
     982          try:
     983              exposed = dispatch(conn, None, 'get_methods', (token,))
     984          finally:
     985              conn.close()
     986  
     987      if authkey is None and manager is not None:
     988          authkey = manager._authkey
     989      if authkey is None:
     990          authkey = process.current_process().authkey
     991  
     992      ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
     993      proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
     994                        incref=incref, manager_owned=manager_owned)
     995      proxy._isauto = True
     996      return proxy
     997  
     998  #
     999  # Types/callables which we will register with SyncManager
    1000  #
    1001  
    1002  class ESC[4;38;5;81mNamespace(ESC[4;38;5;149mobject):
    1003      def __init__(self, /, **kwds):
    1004          self.__dict__.update(kwds)
    1005      def __repr__(self):
    1006          items = list(self.__dict__.items())
    1007          temp = []
    1008          for name, value in items:
    1009              if not name.startswith('_'):
    1010                  temp.append('%s=%r' % (name, value))
    1011          temp.sort()
    1012          return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
    1013  
    1014  class ESC[4;38;5;81mValue(ESC[4;38;5;149mobject):
    1015      def __init__(self, typecode, value, lock=True):
    1016          self._typecode = typecode
    1017          self._value = value
    1018      def get(self):
    1019          return self._value
    1020      def set(self, value):
    1021          self._value = value
    1022      def __repr__(self):
    1023          return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
    1024      value = property(get, set)
    1025  
    1026  def Array(typecode, sequence, lock=True):
    1027      return array.array(typecode, sequence)
    1028  
    1029  #
    1030  # Proxy types used by SyncManager
    1031  #
    1032  
    1033  class ESC[4;38;5;81mIteratorProxy(ESC[4;38;5;149mBaseProxy):
    1034      _exposed_ = ('__next__', 'send', 'throw', 'close')
    1035      def __iter__(self):
    1036          return self
    1037      def __next__(self, *args):
    1038          return self._callmethod('__next__', args)
    1039      def send(self, *args):
    1040          return self._callmethod('send', args)
    1041      def throw(self, *args):
    1042          return self._callmethod('throw', args)
    1043      def close(self, *args):
    1044          return self._callmethod('close', args)
    1045  
    1046  
    1047  class ESC[4;38;5;81mAcquirerProxy(ESC[4;38;5;149mBaseProxy):
    1048      _exposed_ = ('acquire', 'release')
    1049      def acquire(self, blocking=True, timeout=None):
    1050          args = (blocking,) if timeout is None else (blocking, timeout)
    1051          return self._callmethod('acquire', args)
    1052      def release(self):
    1053          return self._callmethod('release')
    1054      def __enter__(self):
    1055          return self._callmethod('acquire')
    1056      def __exit__(self, exc_type, exc_val, exc_tb):
    1057          return self._callmethod('release')
    1058  
    1059  
    1060  class ESC[4;38;5;81mConditionProxy(ESC[4;38;5;149mAcquirerProxy):
    1061      _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
    1062      def wait(self, timeout=None):
    1063          return self._callmethod('wait', (timeout,))
    1064      def notify(self, n=1):
    1065          return self._callmethod('notify', (n,))
    1066      def notify_all(self):
    1067          return self._callmethod('notify_all')
    1068      def wait_for(self, predicate, timeout=None):
    1069          result = predicate()
    1070          if result:
    1071              return result
    1072          if timeout is not None:
    1073              endtime = time.monotonic() + timeout
    1074          else:
    1075              endtime = None
    1076              waittime = None
    1077          while not result:
    1078              if endtime is not None:
    1079                  waittime = endtime - time.monotonic()
    1080                  if waittime <= 0:
    1081                      break
    1082              self.wait(waittime)
    1083              result = predicate()
    1084          return result
    1085  
    1086  
    1087  class ESC[4;38;5;81mEventProxy(ESC[4;38;5;149mBaseProxy):
    1088      _exposed_ = ('is_set', 'set', 'clear', 'wait')
    1089      def is_set(self):
    1090          return self._callmethod('is_set')
    1091      def set(self):
    1092          return self._callmethod('set')
    1093      def clear(self):
    1094          return self._callmethod('clear')
    1095      def wait(self, timeout=None):
    1096          return self._callmethod('wait', (timeout,))
    1097  
    1098  
    1099  class ESC[4;38;5;81mBarrierProxy(ESC[4;38;5;149mBaseProxy):
    1100      _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
    1101      def wait(self, timeout=None):
    1102          return self._callmethod('wait', (timeout,))
    1103      def abort(self):
    1104          return self._callmethod('abort')
    1105      def reset(self):
    1106          return self._callmethod('reset')
    1107      @property
    1108      def parties(self):
    1109          return self._callmethod('__getattribute__', ('parties',))
    1110      @property
    1111      def n_waiting(self):
    1112          return self._callmethod('__getattribute__', ('n_waiting',))
    1113      @property
    1114      def broken(self):
    1115          return self._callmethod('__getattribute__', ('broken',))
    1116  
    1117  
    1118  class ESC[4;38;5;81mNamespaceProxy(ESC[4;38;5;149mBaseProxy):
    1119      _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
    1120      def __getattr__(self, key):
    1121          if key[0] == '_':
    1122              return object.__getattribute__(self, key)
    1123          callmethod = object.__getattribute__(self, '_callmethod')
    1124          return callmethod('__getattribute__', (key,))
    1125      def __setattr__(self, key, value):
    1126          if key[0] == '_':
    1127              return object.__setattr__(self, key, value)
    1128          callmethod = object.__getattribute__(self, '_callmethod')
    1129          return callmethod('__setattr__', (key, value))
    1130      def __delattr__(self, key):
    1131          if key[0] == '_':
    1132              return object.__delattr__(self, key)
    1133          callmethod = object.__getattribute__(self, '_callmethod')
    1134          return callmethod('__delattr__', (key,))
    1135  
    1136  
    1137  class ESC[4;38;5;81mValueProxy(ESC[4;38;5;149mBaseProxy):
    1138      _exposed_ = ('get', 'set')
    1139      def get(self):
    1140          return self._callmethod('get')
    1141      def set(self, value):
    1142          return self._callmethod('set', (value,))
    1143      value = property(get, set)
    1144  
    1145      __class_getitem__ = classmethod(types.GenericAlias)
    1146  
    1147  
    1148  BaseListProxy = MakeProxyType('BaseListProxy', (
    1149      '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
    1150      '__mul__', '__reversed__', '__rmul__', '__setitem__',
    1151      'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
    1152      'reverse', 'sort', '__imul__'
    1153      ))
    1154  class ESC[4;38;5;81mListProxy(ESC[4;38;5;149mBaseListProxy):
    1155      def __iadd__(self, value):
    1156          self._callmethod('extend', (value,))
    1157          return self
    1158      def __imul__(self, value):
    1159          self._callmethod('__imul__', (value,))
    1160          return self
    1161  
    1162  
    1163  DictProxy = MakeProxyType('DictProxy', (
    1164      '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
    1165      '__setitem__', 'clear', 'copy', 'get', 'items',
    1166      'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
    1167      ))
    1168  DictProxy._method_to_typeid_ = {
    1169      '__iter__': 'Iterator',
    1170      }
    1171  
    1172  
    1173  ArrayProxy = MakeProxyType('ArrayProxy', (
    1174      '__len__', '__getitem__', '__setitem__'
    1175      ))
    1176  
    1177  
    1178  BasePoolProxy = MakeProxyType('PoolProxy', (
    1179      'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
    1180      'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
    1181      ))
    1182  BasePoolProxy._method_to_typeid_ = {
    1183      'apply_async': 'AsyncResult',
    1184      'map_async': 'AsyncResult',
    1185      'starmap_async': 'AsyncResult',
    1186      'imap': 'Iterator',
    1187      'imap_unordered': 'Iterator'
    1188      }
    1189  class ESC[4;38;5;81mPoolProxy(ESC[4;38;5;149mBasePoolProxy):
    1190      def __enter__(self):
    1191          return self
    1192      def __exit__(self, exc_type, exc_val, exc_tb):
    1193          self.terminate()
    1194  
    1195  #
    1196  # Definition of SyncManager
    1197  #
    1198  
    1199  class ESC[4;38;5;81mSyncManager(ESC[4;38;5;149mBaseManager):
    1200      '''
    1201      Subclass of `BaseManager` which supports a number of shared object types.
    1202  
    1203      The types registered are those intended for the synchronization
    1204      of threads, plus `dict`, `list` and `Namespace`.
    1205  
    1206      The `multiprocessing.Manager()` function creates started instances of
    1207      this class.
    1208      '''
    1209  
    1210  SyncManager.register('Queue', queue.Queue)
    1211  SyncManager.register('JoinableQueue', queue.Queue)
    1212  SyncManager.register('Event', threading.Event, EventProxy)
    1213  SyncManager.register('Lock', threading.Lock, AcquirerProxy)
    1214  SyncManager.register('RLock', threading.RLock, AcquirerProxy)
    1215  SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
    1216  SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
    1217                       AcquirerProxy)
    1218  SyncManager.register('Condition', threading.Condition, ConditionProxy)
    1219  SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
    1220  SyncManager.register('Pool', pool.Pool, PoolProxy)
    1221  SyncManager.register('list', list, ListProxy)
    1222  SyncManager.register('dict', dict, DictProxy)
    1223  SyncManager.register('Value', Value, ValueProxy)
    1224  SyncManager.register('Array', Array, ArrayProxy)
    1225  SyncManager.register('Namespace', Namespace, NamespaceProxy)
    1226  
    1227  # types returned by methods of PoolProxy
    1228  SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
    1229  SyncManager.register('AsyncResult', create_method=False)
    1230  
    1231  #
    1232  # Definition of SharedMemoryManager and SharedMemoryServer
    1233  #
    1234  
    1235  if HAS_SHMEM:
    1236      class ESC[4;38;5;81m_SharedMemoryTracker:
    1237          "Manages one or more shared memory segments."
    1238  
    1239          def __init__(self, name, segment_names=[]):
    1240              self.shared_memory_context_name = name
    1241              self.segment_names = segment_names
    1242  
    1243          def register_segment(self, segment_name):
    1244              "Adds the supplied shared memory block name to tracker."
    1245              util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
    1246              self.segment_names.append(segment_name)
    1247  
    1248          def destroy_segment(self, segment_name):
    1249              """Calls unlink() on the shared memory block with the supplied name
    1250              and removes it from the list of blocks being tracked."""
    1251              util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
    1252              self.segment_names.remove(segment_name)
    1253              segment = shared_memory.SharedMemory(segment_name)
    1254              segment.close()
    1255              segment.unlink()
    1256  
    1257          def unlink(self):
    1258              "Calls destroy_segment() on all tracked shared memory blocks."
    1259              for segment_name in self.segment_names[:]:
    1260                  self.destroy_segment(segment_name)
    1261  
    1262          def __del__(self):
    1263              util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
    1264              self.unlink()
    1265  
    1266          def __getstate__(self):
    1267              return (self.shared_memory_context_name, self.segment_names)
    1268  
    1269          def __setstate__(self, state):
    1270              self.__init__(*state)
    1271  
    1272  
    1273      class ESC[4;38;5;81mSharedMemoryServer(ESC[4;38;5;149mServer):
    1274  
    1275          public = Server.public + \
    1276                   ['track_segment', 'release_segment', 'list_segments']
    1277  
    1278          def __init__(self, *args, **kwargs):
    1279              Server.__init__(self, *args, **kwargs)
    1280              address = self.address
    1281              # The address of Linux abstract namespaces can be bytes
    1282              if isinstance(address, bytes):
    1283                  address = os.fsdecode(address)
    1284              self.shared_memory_context = \
    1285                  _SharedMemoryTracker(f"shm_{address}_{getpid()}")
    1286              util.debug(f"SharedMemoryServer started by pid {getpid()}")
    1287  
    1288          def create(self, c, typeid, /, *args, **kwargs):
    1289              """Create a new distributed-shared object (not backed by a shared
    1290              memory block) and return its id to be used in a Proxy Object."""
    1291              # Unless set up as a shared proxy, don't make shared_memory_context
    1292              # a standard part of kwargs.  This makes things easier for supplying
    1293              # simple functions.
    1294              if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
    1295                  kwargs['shared_memory_context'] = self.shared_memory_context
    1296              return Server.create(self, c, typeid, *args, **kwargs)
    1297  
    1298          def shutdown(self, c):
    1299              "Call unlink() on all tracked shared memory, terminate the Server."
    1300              self.shared_memory_context.unlink()
    1301              return Server.shutdown(self, c)
    1302  
    1303          def track_segment(self, c, segment_name):
    1304              "Adds the supplied shared memory block name to Server's tracker."
    1305              self.shared_memory_context.register_segment(segment_name)
    1306  
    1307          def release_segment(self, c, segment_name):
    1308              """Calls unlink() on the shared memory block with the supplied name
    1309              and removes it from the tracker instance inside the Server."""
    1310              self.shared_memory_context.destroy_segment(segment_name)
    1311  
    1312          def list_segments(self, c):
    1313              """Returns a list of names of shared memory blocks that the Server
    1314              is currently tracking."""
    1315              return self.shared_memory_context.segment_names
    1316  
    1317  
    1318      class ESC[4;38;5;81mSharedMemoryManager(ESC[4;38;5;149mBaseManager):
    1319          """Like SyncManager but uses SharedMemoryServer instead of Server.
    1320  
    1321          It provides methods for creating and returning SharedMemory instances
    1322          and for creating a list-like object (ShareableList) backed by shared
    1323          memory.  It also provides methods that create and return Proxy Objects
    1324          that support synchronization across processes (i.e. multi-process-safe
    1325          locks and semaphores).
    1326          """
    1327  
    1328          _Server = SharedMemoryServer
    1329  
    1330          def __init__(self, *args, **kwargs):
    1331              if os.name == "posix":
    1332                  # bpo-36867: Ensure the resource_tracker is running before
    1333                  # launching the manager process, so that concurrent
    1334                  # shared_memory manipulation both in the manager and in the
    1335                  # current process does not create two resource_tracker
    1336                  # processes.
    1337                  from . import resource_tracker
    1338                  resource_tracker.ensure_running()
    1339              BaseManager.__init__(self, *args, **kwargs)
    1340              util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
    1341  
    1342          def __del__(self):
    1343              util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
    1344  
    1345          def get_server(self):
    1346              'Better than monkeypatching for now; merge into Server ultimately'
    1347              if self._state.value != State.INITIAL:
    1348                  if self._state.value == State.STARTED:
    1349                      raise ProcessError("Already started SharedMemoryServer")
    1350                  elif self._state.value == State.SHUTDOWN:
    1351                      raise ProcessError("SharedMemoryManager has shut down")
    1352                  else:
    1353                      raise ProcessError(
    1354                          "Unknown state {!r}".format(self._state.value))
    1355              return self._Server(self._registry, self._address,
    1356                                  self._authkey, self._serializer)
    1357  
    1358          def SharedMemory(self, size):
    1359              """Returns a new SharedMemory instance with the specified size in
    1360              bytes, to be tracked by the manager."""
    1361              with self._Client(self._address, authkey=self._authkey) as conn:
    1362                  sms = shared_memory.SharedMemory(None, create=True, size=size)
    1363                  try:
    1364                      dispatch(conn, None, 'track_segment', (sms.name,))
    1365                  except BaseException as e:
    1366                      sms.unlink()
    1367                      raise e
    1368              return sms
    1369  
    1370          def ShareableList(self, sequence):
    1371              """Returns a new ShareableList instance populated with the values
    1372              from the input sequence, to be tracked by the manager."""
    1373              with self._Client(self._address, authkey=self._authkey) as conn:
    1374                  sl = shared_memory.ShareableList(sequence)
    1375                  try:
    1376                      dispatch(conn, None, 'track_segment', (sl.shm.name,))
    1377                  except BaseException as e:
    1378                      sl.shm.unlink()
    1379                      raise e
    1380              return sl