(root)/
Python-3.11.7/
Lib/
multiprocessing/
managers.py
       1  #
       2  # Module providing manager classes for dealing
       3  # with shared objects
       4  #
       5  # multiprocessing/managers.py
       6  #
       7  # Copyright (c) 2006-2008, R Oudkerk
       8  # Licensed to PSF under a Contributor Agreement.
       9  #
      10  
      11  __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
      12  
      13  #
      14  # Imports
      15  #
      16  
      17  import sys
      18  import threading
      19  import signal
      20  import array
      21  import queue
      22  import time
      23  import types
      24  import os
      25  from os import getpid
      26  
      27  from traceback import format_exc
      28  
      29  from . import connection
      30  from .context import reduction, get_spawning_popen, ProcessError
      31  from . import pool
      32  from . import process
      33  from . import util
      34  from . import get_context
      35  try:
      36      from . import shared_memory
      37  except ImportError:
      38      HAS_SHMEM = False
      39  else:
      40      HAS_SHMEM = True
      41      __all__.append('SharedMemoryManager')
      42  
      43  #
      44  # Register some things for pickling
      45  #
      46  
      47  def reduce_array(a):
      48      return array.array, (a.typecode, a.tobytes())
      49  reduction.register(array.array, reduce_array)
      50  
      51  view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
      52  def rebuild_as_list(obj):
      53      return list, (list(obj),)
      54  for view_type in view_types:
      55      reduction.register(view_type, rebuild_as_list)
      56  del view_type, view_types
      57  
      58  #
      59  # Type for identifying shared objects
      60  #
      61  
      62  class ESC[4;38;5;81mToken(ESC[4;38;5;149mobject):
      63      '''
      64      Type to uniquely identify a shared object
      65      '''
      66      __slots__ = ('typeid', 'address', 'id')
      67  
      68      def __init__(self, typeid, address, id):
      69          (self.typeid, self.address, self.id) = (typeid, address, id)
      70  
      71      def __getstate__(self):
      72          return (self.typeid, self.address, self.id)
      73  
      74      def __setstate__(self, state):
      75          (self.typeid, self.address, self.id) = state
      76  
      77      def __repr__(self):
      78          return '%s(typeid=%r, address=%r, id=%r)' % \
      79                 (self.__class__.__name__, self.typeid, self.address, self.id)
      80  
      81  #
      82  # Function for communication with a manager's server process
      83  #
      84  
      85  def dispatch(c, id, methodname, args=(), kwds={}):
      86      '''
      87      Send a message to manager using connection `c` and return response
      88      '''
      89      c.send((id, methodname, args, kwds))
      90      kind, result = c.recv()
      91      if kind == '#RETURN':
      92          return result
      93      raise convert_to_error(kind, result)
      94  
      95  def convert_to_error(kind, result):
      96      if kind == '#ERROR':
      97          return result
      98      elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
      99          if not isinstance(result, str):
     100              raise TypeError(
     101                  "Result {0!r} (kind '{1}') type is {2}, not str".format(
     102                      result, kind, type(result)))
     103          if kind == '#UNSERIALIZABLE':
     104              return RemoteError('Unserializable message: %s\n' % result)
     105          else:
     106              return RemoteError(result)
     107      else:
     108          return ValueError('Unrecognized message type {!r}'.format(kind))
     109  
     110  class ESC[4;38;5;81mRemoteError(ESC[4;38;5;149mException):
     111      def __str__(self):
     112          return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
     113  
     114  #
     115  # Functions for finding the method names of an object
     116  #
     117  
     118  def all_methods(obj):
     119      '''
     120      Return a list of names of methods of `obj`
     121      '''
     122      temp = []
     123      for name in dir(obj):
     124          func = getattr(obj, name)
     125          if callable(func):
     126              temp.append(name)
     127      return temp
     128  
     129  def public_methods(obj):
     130      '''
     131      Return a list of names of methods of `obj` which do not start with '_'
     132      '''
     133      return [name for name in all_methods(obj) if name[0] != '_']
     134  
     135  #
     136  # Server which is run in a process controlled by a manager
     137  #
     138  
     139  class ESC[4;38;5;81mServer(ESC[4;38;5;149mobject):
     140      '''
     141      Server class which runs in a process controlled by a manager object
     142      '''
     143      public = ['shutdown', 'create', 'accept_connection', 'get_methods',
     144                'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
     145  
     146      def __init__(self, registry, address, authkey, serializer):
     147          if not isinstance(authkey, bytes):
     148              raise TypeError(
     149                  "Authkey {0!r} is type {1!s}, not bytes".format(
     150                      authkey, type(authkey)))
     151          self.registry = registry
     152          self.authkey = process.AuthenticationString(authkey)
     153          Listener, Client = listener_client[serializer]
     154  
     155          # do authentication later
     156          self.listener = Listener(address=address, backlog=16)
     157          self.address = self.listener.address
     158  
     159          self.id_to_obj = {'0': (None, ())}
     160          self.id_to_refcount = {}
     161          self.id_to_local_proxy_obj = {}
     162          self.mutex = threading.Lock()
     163  
     164      def serve_forever(self):
     165          '''
     166          Run the server forever
     167          '''
     168          self.stop_event = threading.Event()
     169          process.current_process()._manager_server = self
     170          try:
     171              accepter = threading.Thread(target=self.accepter)
     172              accepter.daemon = True
     173              accepter.start()
     174              try:
     175                  while not self.stop_event.is_set():
     176                      self.stop_event.wait(1)
     177              except (KeyboardInterrupt, SystemExit):
     178                  pass
     179          finally:
     180              if sys.stdout != sys.__stdout__: # what about stderr?
     181                  util.debug('resetting stdout, stderr')
     182                  sys.stdout = sys.__stdout__
     183                  sys.stderr = sys.__stderr__
     184              sys.exit(0)
     185  
     186      def accepter(self):
     187          while True:
     188              try:
     189                  c = self.listener.accept()
     190              except OSError:
     191                  continue
     192              t = threading.Thread(target=self.handle_request, args=(c,))
     193              t.daemon = True
     194              t.start()
     195  
     196      def _handle_request(self, c):
     197          request = None
     198          try:
     199              connection.deliver_challenge(c, self.authkey)
     200              connection.answer_challenge(c, self.authkey)
     201              request = c.recv()
     202              ignore, funcname, args, kwds = request
     203              assert funcname in self.public, '%r unrecognized' % funcname
     204              func = getattr(self, funcname)
     205          except Exception:
     206              msg = ('#TRACEBACK', format_exc())
     207          else:
     208              try:
     209                  result = func(c, *args, **kwds)
     210              except Exception:
     211                  msg = ('#TRACEBACK', format_exc())
     212              else:
     213                  msg = ('#RETURN', result)
     214  
     215          try:
     216              c.send(msg)
     217          except Exception as e:
     218              try:
     219                  c.send(('#TRACEBACK', format_exc()))
     220              except Exception:
     221                  pass
     222              util.info('Failure to send message: %r', msg)
     223              util.info(' ... request was %r', request)
     224              util.info(' ... exception was %r', e)
     225  
     226      def handle_request(self, conn):
     227          '''
     228          Handle a new connection
     229          '''
     230          try:
     231              self._handle_request(conn)
     232          except SystemExit:
     233              # Server.serve_client() calls sys.exit(0) on EOF
     234              pass
     235          finally:
     236              conn.close()
     237  
     238      def serve_client(self, conn):
     239          '''
     240          Handle requests from the proxies in a particular process/thread
     241          '''
     242          util.debug('starting server thread to service %r',
     243                     threading.current_thread().name)
     244  
     245          recv = conn.recv
     246          send = conn.send
     247          id_to_obj = self.id_to_obj
     248  
     249          while not self.stop_event.is_set():
     250  
     251              try:
     252                  methodname = obj = None
     253                  request = recv()
     254                  ident, methodname, args, kwds = request
     255                  try:
     256                      obj, exposed, gettypeid = id_to_obj[ident]
     257                  except KeyError as ke:
     258                      try:
     259                          obj, exposed, gettypeid = \
     260                              self.id_to_local_proxy_obj[ident]
     261                      except KeyError:
     262                          raise ke
     263  
     264                  if methodname not in exposed:
     265                      raise AttributeError(
     266                          'method %r of %r object is not in exposed=%r' %
     267                          (methodname, type(obj), exposed)
     268                          )
     269  
     270                  function = getattr(obj, methodname)
     271  
     272                  try:
     273                      res = function(*args, **kwds)
     274                  except Exception as e:
     275                      msg = ('#ERROR', e)
     276                  else:
     277                      typeid = gettypeid and gettypeid.get(methodname, None)
     278                      if typeid:
     279                          rident, rexposed = self.create(conn, typeid, res)
     280                          token = Token(typeid, self.address, rident)
     281                          msg = ('#PROXY', (rexposed, token))
     282                      else:
     283                          msg = ('#RETURN', res)
     284  
     285              except AttributeError:
     286                  if methodname is None:
     287                      msg = ('#TRACEBACK', format_exc())
     288                  else:
     289                      try:
     290                          fallback_func = self.fallback_mapping[methodname]
     291                          result = fallback_func(
     292                              self, conn, ident, obj, *args, **kwds
     293                              )
     294                          msg = ('#RETURN', result)
     295                      except Exception:
     296                          msg = ('#TRACEBACK', format_exc())
     297  
     298              except EOFError:
     299                  util.debug('got EOF -- exiting thread serving %r',
     300                             threading.current_thread().name)
     301                  sys.exit(0)
     302  
     303              except Exception:
     304                  msg = ('#TRACEBACK', format_exc())
     305  
     306              try:
     307                  try:
     308                      send(msg)
     309                  except Exception:
     310                      send(('#UNSERIALIZABLE', format_exc()))
     311              except Exception as e:
     312                  util.info('exception in thread serving %r',
     313                          threading.current_thread().name)
     314                  util.info(' ... message was %r', msg)
     315                  util.info(' ... exception was %r', e)
     316                  conn.close()
     317                  sys.exit(1)
     318  
     319      def fallback_getvalue(self, conn, ident, obj):
     320          return obj
     321  
     322      def fallback_str(self, conn, ident, obj):
     323          return str(obj)
     324  
     325      def fallback_repr(self, conn, ident, obj):
     326          return repr(obj)
     327  
     328      fallback_mapping = {
     329          '__str__':fallback_str,
     330          '__repr__':fallback_repr,
     331          '#GETVALUE':fallback_getvalue
     332          }
     333  
     334      def dummy(self, c):
     335          pass
     336  
     337      def debug_info(self, c):
     338          '''
     339          Return some info --- useful to spot problems with refcounting
     340          '''
     341          # Perhaps include debug info about 'c'?
     342          with self.mutex:
     343              result = []
     344              keys = list(self.id_to_refcount.keys())
     345              keys.sort()
     346              for ident in keys:
     347                  if ident != '0':
     348                      result.append('  %s:       refcount=%s\n    %s' %
     349                                    (ident, self.id_to_refcount[ident],
     350                                     str(self.id_to_obj[ident][0])[:75]))
     351              return '\n'.join(result)
     352  
     353      def number_of_objects(self, c):
     354          '''
     355          Number of shared objects
     356          '''
     357          # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
     358          return len(self.id_to_refcount)
     359  
     360      def shutdown(self, c):
     361          '''
     362          Shutdown this process
     363          '''
     364          try:
     365              util.debug('manager received shutdown message')
     366              c.send(('#RETURN', None))
     367          except:
     368              import traceback
     369              traceback.print_exc()
     370          finally:
     371              self.stop_event.set()
     372  
     373      def create(self, c, typeid, /, *args, **kwds):
     374          '''
     375          Create a new shared object and return its id
     376          '''
     377          with self.mutex:
     378              callable, exposed, method_to_typeid, proxytype = \
     379                        self.registry[typeid]
     380  
     381              if callable is None:
     382                  if kwds or (len(args) != 1):
     383                      raise ValueError(
     384                          "Without callable, must have one non-keyword argument")
     385                  obj = args[0]
     386              else:
     387                  obj = callable(*args, **kwds)
     388  
     389              if exposed is None:
     390                  exposed = public_methods(obj)
     391              if method_to_typeid is not None:
     392                  if not isinstance(method_to_typeid, dict):
     393                      raise TypeError(
     394                          "Method_to_typeid {0!r}: type {1!s}, not dict".format(
     395                              method_to_typeid, type(method_to_typeid)))
     396                  exposed = list(exposed) + list(method_to_typeid)
     397  
     398              ident = '%x' % id(obj)  # convert to string because xmlrpclib
     399                                      # only has 32 bit signed integers
     400              util.debug('%r callable returned object with id %r', typeid, ident)
     401  
     402              self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
     403              if ident not in self.id_to_refcount:
     404                  self.id_to_refcount[ident] = 0
     405  
     406          self.incref(c, ident)
     407          return ident, tuple(exposed)
     408  
     409      def get_methods(self, c, token):
     410          '''
     411          Return the methods of the shared object indicated by token
     412          '''
     413          return tuple(self.id_to_obj[token.id][1])
     414  
     415      def accept_connection(self, c, name):
     416          '''
     417          Spawn a new thread to serve this connection
     418          '''
     419          threading.current_thread().name = name
     420          c.send(('#RETURN', None))
     421          self.serve_client(c)
     422  
     423      def incref(self, c, ident):
     424          with self.mutex:
     425              try:
     426                  self.id_to_refcount[ident] += 1
     427              except KeyError as ke:
     428                  # If no external references exist but an internal (to the
     429                  # manager) still does and a new external reference is created
     430                  # from it, restore the manager's tracking of it from the
     431                  # previously stashed internal ref.
     432                  if ident in self.id_to_local_proxy_obj:
     433                      self.id_to_refcount[ident] = 1
     434                      self.id_to_obj[ident] = \
     435                          self.id_to_local_proxy_obj[ident]
     436                      obj, exposed, gettypeid = self.id_to_obj[ident]
     437                      util.debug('Server re-enabled tracking & INCREF %r', ident)
     438                  else:
     439                      raise ke
     440  
     441      def decref(self, c, ident):
     442          if ident not in self.id_to_refcount and \
     443              ident in self.id_to_local_proxy_obj:
     444              util.debug('Server DECREF skipping %r', ident)
     445              return
     446  
     447          with self.mutex:
     448              if self.id_to_refcount[ident] <= 0:
     449                  raise AssertionError(
     450                      "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
     451                          ident, self.id_to_obj[ident],
     452                          self.id_to_refcount[ident]))
     453              self.id_to_refcount[ident] -= 1
     454              if self.id_to_refcount[ident] == 0:
     455                  del self.id_to_refcount[ident]
     456  
     457          if ident not in self.id_to_refcount:
     458              # Two-step process in case the object turns out to contain other
     459              # proxy objects (e.g. a managed list of managed lists).
     460              # Otherwise, deleting self.id_to_obj[ident] would trigger the
     461              # deleting of the stored value (another managed object) which would
     462              # in turn attempt to acquire the mutex that is already held here.
     463              self.id_to_obj[ident] = (None, (), None)  # thread-safe
     464              util.debug('disposing of obj with id %r', ident)
     465              with self.mutex:
     466                  del self.id_to_obj[ident]
     467  
     468  
     469  #
     470  # Class to represent state of a manager
     471  #
     472  
     473  class ESC[4;38;5;81mState(ESC[4;38;5;149mobject):
     474      __slots__ = ['value']
     475      INITIAL = 0
     476      STARTED = 1
     477      SHUTDOWN = 2
     478  
     479  #
     480  # Mapping from serializer name to Listener and Client types
     481  #
     482  
     483  listener_client = {
     484      'pickle' : (connection.Listener, connection.Client),
     485      'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
     486      }
     487  
     488  #
     489  # Definition of BaseManager
     490  #
     491  
     492  class ESC[4;38;5;81mBaseManager(ESC[4;38;5;149mobject):
     493      '''
     494      Base class for managers
     495      '''
     496      _registry = {}
     497      _Server = Server
     498  
     499      def __init__(self, address=None, authkey=None, serializer='pickle',
     500                   ctx=None, *, shutdown_timeout=1.0):
     501          if authkey is None:
     502              authkey = process.current_process().authkey
     503          self._address = address     # XXX not final address if eg ('', 0)
     504          self._authkey = process.AuthenticationString(authkey)
     505          self._state = State()
     506          self._state.value = State.INITIAL
     507          self._serializer = serializer
     508          self._Listener, self._Client = listener_client[serializer]
     509          self._ctx = ctx or get_context()
     510          self._shutdown_timeout = shutdown_timeout
     511  
     512      def get_server(self):
     513          '''
     514          Return server object with serve_forever() method and address attribute
     515          '''
     516          if self._state.value != State.INITIAL:
     517              if self._state.value == State.STARTED:
     518                  raise ProcessError("Already started server")
     519              elif self._state.value == State.SHUTDOWN:
     520                  raise ProcessError("Manager has shut down")
     521              else:
     522                  raise ProcessError(
     523                      "Unknown state {!r}".format(self._state.value))
     524          return Server(self._registry, self._address,
     525                        self._authkey, self._serializer)
     526  
     527      def connect(self):
     528          '''
     529          Connect manager object to the server process
     530          '''
     531          Listener, Client = listener_client[self._serializer]
     532          conn = Client(self._address, authkey=self._authkey)
     533          dispatch(conn, None, 'dummy')
     534          self._state.value = State.STARTED
     535  
     536      def start(self, initializer=None, initargs=()):
     537          '''
     538          Spawn a server process for this manager object
     539          '''
     540          if self._state.value != State.INITIAL:
     541              if self._state.value == State.STARTED:
     542                  raise ProcessError("Already started server")
     543              elif self._state.value == State.SHUTDOWN:
     544                  raise ProcessError("Manager has shut down")
     545              else:
     546                  raise ProcessError(
     547                      "Unknown state {!r}".format(self._state.value))
     548  
     549          if initializer is not None and not callable(initializer):
     550              raise TypeError('initializer must be a callable')
     551  
     552          # pipe over which we will retrieve address of server
     553          reader, writer = connection.Pipe(duplex=False)
     554  
     555          # spawn process which runs a server
     556          self._process = self._ctx.Process(
     557              target=type(self)._run_server,
     558              args=(self._registry, self._address, self._authkey,
     559                    self._serializer, writer, initializer, initargs),
     560              )
     561          ident = ':'.join(str(i) for i in self._process._identity)
     562          self._process.name = type(self).__name__  + '-' + ident
     563          self._process.start()
     564  
     565          # get address of server
     566          writer.close()
     567          self._address = reader.recv()
     568          reader.close()
     569  
     570          # register a finalizer
     571          self._state.value = State.STARTED
     572          self.shutdown = util.Finalize(
     573              self, type(self)._finalize_manager,
     574              args=(self._process, self._address, self._authkey, self._state,
     575                    self._Client, self._shutdown_timeout),
     576              exitpriority=0
     577              )
     578  
     579      @classmethod
     580      def _run_server(cls, registry, address, authkey, serializer, writer,
     581                      initializer=None, initargs=()):
     582          '''
     583          Create a server, report its address and run it
     584          '''
     585          # bpo-36368: protect server process from KeyboardInterrupt signals
     586          signal.signal(signal.SIGINT, signal.SIG_IGN)
     587  
     588          if initializer is not None:
     589              initializer(*initargs)
     590  
     591          # create server
     592          server = cls._Server(registry, address, authkey, serializer)
     593  
     594          # inform parent process of the server's address
     595          writer.send(server.address)
     596          writer.close()
     597  
     598          # run the manager
     599          util.info('manager serving at %r', server.address)
     600          server.serve_forever()
     601  
     602      def _create(self, typeid, /, *args, **kwds):
     603          '''
     604          Create a new shared object; return the token and exposed tuple
     605          '''
     606          assert self._state.value == State.STARTED, 'server not yet started'
     607          conn = self._Client(self._address, authkey=self._authkey)
     608          try:
     609              id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
     610          finally:
     611              conn.close()
     612          return Token(typeid, self._address, id), exposed
     613  
     614      def join(self, timeout=None):
     615          '''
     616          Join the manager process (if it has been spawned)
     617          '''
     618          if self._process is not None:
     619              self._process.join(timeout)
     620              if not self._process.is_alive():
     621                  self._process = None
     622  
     623      def _debug_info(self):
     624          '''
     625          Return some info about the servers shared objects and connections
     626          '''
     627          conn = self._Client(self._address, authkey=self._authkey)
     628          try:
     629              return dispatch(conn, None, 'debug_info')
     630          finally:
     631              conn.close()
     632  
     633      def _number_of_objects(self):
     634          '''
     635          Return the number of shared objects
     636          '''
     637          conn = self._Client(self._address, authkey=self._authkey)
     638          try:
     639              return dispatch(conn, None, 'number_of_objects')
     640          finally:
     641              conn.close()
     642  
     643      def __enter__(self):
     644          if self._state.value == State.INITIAL:
     645              self.start()
     646          if self._state.value != State.STARTED:
     647              if self._state.value == State.INITIAL:
     648                  raise ProcessError("Unable to start server")
     649              elif self._state.value == State.SHUTDOWN:
     650                  raise ProcessError("Manager has shut down")
     651              else:
     652                  raise ProcessError(
     653                      "Unknown state {!r}".format(self._state.value))
     654          return self
     655  
     656      def __exit__(self, exc_type, exc_val, exc_tb):
     657          self.shutdown()
     658  
     659      @staticmethod
     660      def _finalize_manager(process, address, authkey, state, _Client,
     661                            shutdown_timeout):
     662          '''
     663          Shutdown the manager process; will be registered as a finalizer
     664          '''
     665          if process.is_alive():
     666              util.info('sending shutdown message to manager')
     667              try:
     668                  conn = _Client(address, authkey=authkey)
     669                  try:
     670                      dispatch(conn, None, 'shutdown')
     671                  finally:
     672                      conn.close()
     673              except Exception:
     674                  pass
     675  
     676              process.join(timeout=shutdown_timeout)
     677              if process.is_alive():
     678                  util.info('manager still alive')
     679                  if hasattr(process, 'terminate'):
     680                      util.info('trying to `terminate()` manager process')
     681                      process.terminate()
     682                      process.join(timeout=shutdown_timeout)
     683                      if process.is_alive():
     684                          util.info('manager still alive after terminate')
     685                          process.kill()
     686                          process.join()
     687  
     688          state.value = State.SHUTDOWN
     689          try:
     690              del BaseProxy._address_to_local[address]
     691          except KeyError:
     692              pass
     693  
     694      @property
     695      def address(self):
     696          return self._address
     697  
     698      @classmethod
     699      def register(cls, typeid, callable=None, proxytype=None, exposed=None,
     700                   method_to_typeid=None, create_method=True):
     701          '''
     702          Register a typeid with the manager type
     703          '''
     704          if '_registry' not in cls.__dict__:
     705              cls._registry = cls._registry.copy()
     706  
     707          if proxytype is None:
     708              proxytype = AutoProxy
     709  
     710          exposed = exposed or getattr(proxytype, '_exposed_', None)
     711  
     712          method_to_typeid = method_to_typeid or \
     713                             getattr(proxytype, '_method_to_typeid_', None)
     714  
     715          if method_to_typeid:
     716              for key, value in list(method_to_typeid.items()): # isinstance?
     717                  assert type(key) is str, '%r is not a string' % key
     718                  assert type(value) is str, '%r is not a string' % value
     719  
     720          cls._registry[typeid] = (
     721              callable, exposed, method_to_typeid, proxytype
     722              )
     723  
     724          if create_method:
     725              def temp(self, /, *args, **kwds):
     726                  util.debug('requesting creation of a shared %r object', typeid)
     727                  token, exp = self._create(typeid, *args, **kwds)
     728                  proxy = proxytype(
     729                      token, self._serializer, manager=self,
     730                      authkey=self._authkey, exposed=exp
     731                      )
     732                  conn = self._Client(token.address, authkey=self._authkey)
     733                  dispatch(conn, None, 'decref', (token.id,))
     734                  return proxy
     735              temp.__name__ = typeid
     736              setattr(cls, typeid, temp)
     737  
     738  #
     739  # Subclass of set which get cleared after a fork
     740  #
     741  
     742  class ESC[4;38;5;81mProcessLocalSet(ESC[4;38;5;149mset):
     743      def __init__(self):
     744          util.register_after_fork(self, lambda obj: obj.clear())
     745      def __reduce__(self):
     746          return type(self), ()
     747  
     748  #
     749  # Definition of BaseProxy
     750  #
     751  
     752  class ESC[4;38;5;81mBaseProxy(ESC[4;38;5;149mobject):
     753      '''
     754      A base for proxies of shared objects
     755      '''
     756      _address_to_local = {}
     757      _mutex = util.ForkAwareThreadLock()
     758  
     759      def __init__(self, token, serializer, manager=None,
     760                   authkey=None, exposed=None, incref=True, manager_owned=False):
     761          with BaseProxy._mutex:
     762              tls_idset = BaseProxy._address_to_local.get(token.address, None)
     763              if tls_idset is None:
     764                  tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
     765                  BaseProxy._address_to_local[token.address] = tls_idset
     766  
     767          # self._tls is used to record the connection used by this
     768          # thread to communicate with the manager at token.address
     769          self._tls = tls_idset[0]
     770  
     771          # self._idset is used to record the identities of all shared
     772          # objects for which the current process owns references and
     773          # which are in the manager at token.address
     774          self._idset = tls_idset[1]
     775  
     776          self._token = token
     777          self._id = self._token.id
     778          self._manager = manager
     779          self._serializer = serializer
     780          self._Client = listener_client[serializer][1]
     781  
     782          # Should be set to True only when a proxy object is being created
     783          # on the manager server; primary use case: nested proxy objects.
     784          # RebuildProxy detects when a proxy is being created on the manager
     785          # and sets this value appropriately.
     786          self._owned_by_manager = manager_owned
     787  
     788          if authkey is not None:
     789              self._authkey = process.AuthenticationString(authkey)
     790          elif self._manager is not None:
     791              self._authkey = self._manager._authkey
     792          else:
     793              self._authkey = process.current_process().authkey
     794  
     795          if incref:
     796              self._incref()
     797  
     798          util.register_after_fork(self, BaseProxy._after_fork)
     799  
     800      def _connect(self):
     801          util.debug('making connection to manager')
     802          name = process.current_process().name
     803          if threading.current_thread().name != 'MainThread':
     804              name += '|' + threading.current_thread().name
     805          conn = self._Client(self._token.address, authkey=self._authkey)
     806          dispatch(conn, None, 'accept_connection', (name,))
     807          self._tls.connection = conn
     808  
     809      def _callmethod(self, methodname, args=(), kwds={}):
     810          '''
     811          Try to call a method of the referent and return a copy of the result
     812          '''
     813          try:
     814              conn = self._tls.connection
     815          except AttributeError:
     816              util.debug('thread %r does not own a connection',
     817                         threading.current_thread().name)
     818              self._connect()
     819              conn = self._tls.connection
     820  
     821          conn.send((self._id, methodname, args, kwds))
     822          kind, result = conn.recv()
     823  
     824          if kind == '#RETURN':
     825              return result
     826          elif kind == '#PROXY':
     827              exposed, token = result
     828              proxytype = self._manager._registry[token.typeid][-1]
     829              token.address = self._token.address
     830              proxy = proxytype(
     831                  token, self._serializer, manager=self._manager,
     832                  authkey=self._authkey, exposed=exposed
     833                  )
     834              conn = self._Client(token.address, authkey=self._authkey)
     835              dispatch(conn, None, 'decref', (token.id,))
     836              return proxy
     837          raise convert_to_error(kind, result)
     838  
     839      def _getvalue(self):
     840          '''
     841          Get a copy of the value of the referent
     842          '''
     843          return self._callmethod('#GETVALUE')
     844  
     845      def _incref(self):
     846          if self._owned_by_manager:
     847              util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
     848              return
     849  
     850          conn = self._Client(self._token.address, authkey=self._authkey)
     851          dispatch(conn, None, 'incref', (self._id,))
     852          util.debug('INCREF %r', self._token.id)
     853  
     854          self._idset.add(self._id)
     855  
     856          state = self._manager and self._manager._state
     857  
     858          self._close = util.Finalize(
     859              self, BaseProxy._decref,
     860              args=(self._token, self._authkey, state,
     861                    self._tls, self._idset, self._Client),
     862              exitpriority=10
     863              )
     864  
     865      @staticmethod
     866      def _decref(token, authkey, state, tls, idset, _Client):
     867          idset.discard(token.id)
     868  
     869          # check whether manager is still alive
     870          if state is None or state.value == State.STARTED:
     871              # tell manager this process no longer cares about referent
     872              try:
     873                  util.debug('DECREF %r', token.id)
     874                  conn = _Client(token.address, authkey=authkey)
     875                  dispatch(conn, None, 'decref', (token.id,))
     876              except Exception as e:
     877                  util.debug('... decref failed %s', e)
     878  
     879          else:
     880              util.debug('DECREF %r -- manager already shutdown', token.id)
     881  
     882          # check whether we can close this thread's connection because
     883          # the process owns no more references to objects for this manager
     884          if not idset and hasattr(tls, 'connection'):
     885              util.debug('thread %r has no more proxies so closing conn',
     886                         threading.current_thread().name)
     887              tls.connection.close()
     888              del tls.connection
     889  
     890      def _after_fork(self):
     891          self._manager = None
     892          try:
     893              self._incref()
     894          except Exception as e:
     895              # the proxy may just be for a manager which has shutdown
     896              util.info('incref failed: %s' % e)
     897  
     898      def __reduce__(self):
     899          kwds = {}
     900          if get_spawning_popen() is not None:
     901              kwds['authkey'] = self._authkey
     902  
     903          if getattr(self, '_isauto', False):
     904              kwds['exposed'] = self._exposed_
     905              return (RebuildProxy,
     906                      (AutoProxy, self._token, self._serializer, kwds))
     907          else:
     908              return (RebuildProxy,
     909                      (type(self), self._token, self._serializer, kwds))
     910  
     911      def __deepcopy__(self, memo):
     912          return self._getvalue()
     913  
     914      def __repr__(self):
     915          return '<%s object, typeid %r at %#x>' % \
     916                 (type(self).__name__, self._token.typeid, id(self))
     917  
     918      def __str__(self):
     919          '''
     920          Return representation of the referent (or a fall-back if that fails)
     921          '''
     922          try:
     923              return self._callmethod('__repr__')
     924          except Exception:
     925              return repr(self)[:-1] + "; '__str__()' failed>"
     926  
     927  #
     928  # Function used for unpickling
     929  #
     930  
     931  def RebuildProxy(func, token, serializer, kwds):
     932      '''
     933      Function used for unpickling proxy objects.
     934      '''
     935      server = getattr(process.current_process(), '_manager_server', None)
     936      if server and server.address == token.address:
     937          util.debug('Rebuild a proxy owned by manager, token=%r', token)
     938          kwds['manager_owned'] = True
     939          if token.id not in server.id_to_local_proxy_obj:
     940              server.id_to_local_proxy_obj[token.id] = \
     941                  server.id_to_obj[token.id]
     942      incref = (
     943          kwds.pop('incref', True) and
     944          not getattr(process.current_process(), '_inheriting', False)
     945          )
     946      return func(token, serializer, incref=incref, **kwds)
     947  
     948  #
     949  # Functions to create proxies and proxy types
     950  #
     951  
     952  def MakeProxyType(name, exposed, _cache={}):
     953      '''
     954      Return a proxy type whose methods are given by `exposed`
     955      '''
     956      exposed = tuple(exposed)
     957      try:
     958          return _cache[(name, exposed)]
     959      except KeyError:
     960          pass
     961  
     962      dic = {}
     963  
     964      for meth in exposed:
     965          exec('''def %s(self, /, *args, **kwds):
     966          return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
     967  
     968      ProxyType = type(name, (BaseProxy,), dic)
     969      ProxyType._exposed_ = exposed
     970      _cache[(name, exposed)] = ProxyType
     971      return ProxyType
     972  
     973  
     974  def AutoProxy(token, serializer, manager=None, authkey=None,
     975                exposed=None, incref=True, manager_owned=False):
     976      '''
     977      Return an auto-proxy for `token`
     978      '''
     979      _Client = listener_client[serializer][1]
     980  
     981      if exposed is None:
     982          conn = _Client(token.address, authkey=authkey)
     983          try:
     984              exposed = dispatch(conn, None, 'get_methods', (token,))
     985          finally:
     986              conn.close()
     987  
     988      if authkey is None and manager is not None:
     989          authkey = manager._authkey
     990      if authkey is None:
     991          authkey = process.current_process().authkey
     992  
     993      ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
     994      proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
     995                        incref=incref, manager_owned=manager_owned)
     996      proxy._isauto = True
     997      return proxy
     998  
     999  #
    1000  # Types/callables which we will register with SyncManager
    1001  #
    1002  
    1003  class ESC[4;38;5;81mNamespace(ESC[4;38;5;149mobject):
    1004      def __init__(self, /, **kwds):
    1005          self.__dict__.update(kwds)
    1006      def __repr__(self):
    1007          items = list(self.__dict__.items())
    1008          temp = []
    1009          for name, value in items:
    1010              if not name.startswith('_'):
    1011                  temp.append('%s=%r' % (name, value))
    1012          temp.sort()
    1013          return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
    1014  
    1015  class ESC[4;38;5;81mValue(ESC[4;38;5;149mobject):
    1016      def __init__(self, typecode, value, lock=True):
    1017          self._typecode = typecode
    1018          self._value = value
    1019      def get(self):
    1020          return self._value
    1021      def set(self, value):
    1022          self._value = value
    1023      def __repr__(self):
    1024          return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
    1025      value = property(get, set)
    1026  
    1027  def Array(typecode, sequence, lock=True):
    1028      return array.array(typecode, sequence)
    1029  
    1030  #
    1031  # Proxy types used by SyncManager
    1032  #
    1033  
    1034  class ESC[4;38;5;81mIteratorProxy(ESC[4;38;5;149mBaseProxy):
    1035      _exposed_ = ('__next__', 'send', 'throw', 'close')
    1036      def __iter__(self):
    1037          return self
    1038      def __next__(self, *args):
    1039          return self._callmethod('__next__', args)
    1040      def send(self, *args):
    1041          return self._callmethod('send', args)
    1042      def throw(self, *args):
    1043          return self._callmethod('throw', args)
    1044      def close(self, *args):
    1045          return self._callmethod('close', args)
    1046  
    1047  
    1048  class ESC[4;38;5;81mAcquirerProxy(ESC[4;38;5;149mBaseProxy):
    1049      _exposed_ = ('acquire', 'release')
    1050      def acquire(self, blocking=True, timeout=None):
    1051          args = (blocking,) if timeout is None else (blocking, timeout)
    1052          return self._callmethod('acquire', args)
    1053      def release(self):
    1054          return self._callmethod('release')
    1055      def __enter__(self):
    1056          return self._callmethod('acquire')
    1057      def __exit__(self, exc_type, exc_val, exc_tb):
    1058          return self._callmethod('release')
    1059  
    1060  
    1061  class ESC[4;38;5;81mConditionProxy(ESC[4;38;5;149mAcquirerProxy):
    1062      _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
    1063      def wait(self, timeout=None):
    1064          return self._callmethod('wait', (timeout,))
    1065      def notify(self, n=1):
    1066          return self._callmethod('notify', (n,))
    1067      def notify_all(self):
    1068          return self._callmethod('notify_all')
    1069      def wait_for(self, predicate, timeout=None):
    1070          result = predicate()
    1071          if result:
    1072              return result
    1073          if timeout is not None:
    1074              endtime = time.monotonic() + timeout
    1075          else:
    1076              endtime = None
    1077              waittime = None
    1078          while not result:
    1079              if endtime is not None:
    1080                  waittime = endtime - time.monotonic()
    1081                  if waittime <= 0:
    1082                      break
    1083              self.wait(waittime)
    1084              result = predicate()
    1085          return result
    1086  
    1087  
    1088  class ESC[4;38;5;81mEventProxy(ESC[4;38;5;149mBaseProxy):
    1089      _exposed_ = ('is_set', 'set', 'clear', 'wait')
    1090      def is_set(self):
    1091          return self._callmethod('is_set')
    1092      def set(self):
    1093          return self._callmethod('set')
    1094      def clear(self):
    1095          return self._callmethod('clear')
    1096      def wait(self, timeout=None):
    1097          return self._callmethod('wait', (timeout,))
    1098  
    1099  
    1100  class ESC[4;38;5;81mBarrierProxy(ESC[4;38;5;149mBaseProxy):
    1101      _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
    1102      def wait(self, timeout=None):
    1103          return self._callmethod('wait', (timeout,))
    1104      def abort(self):
    1105          return self._callmethod('abort')
    1106      def reset(self):
    1107          return self._callmethod('reset')
    1108      @property
    1109      def parties(self):
    1110          return self._callmethod('__getattribute__', ('parties',))
    1111      @property
    1112      def n_waiting(self):
    1113          return self._callmethod('__getattribute__', ('n_waiting',))
    1114      @property
    1115      def broken(self):
    1116          return self._callmethod('__getattribute__', ('broken',))
    1117  
    1118  
    1119  class ESC[4;38;5;81mNamespaceProxy(ESC[4;38;5;149mBaseProxy):
    1120      _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
    1121      def __getattr__(self, key):
    1122          if key[0] == '_':
    1123              return object.__getattribute__(self, key)
    1124          callmethod = object.__getattribute__(self, '_callmethod')
    1125          return callmethod('__getattribute__', (key,))
    1126      def __setattr__(self, key, value):
    1127          if key[0] == '_':
    1128              return object.__setattr__(self, key, value)
    1129          callmethod = object.__getattribute__(self, '_callmethod')
    1130          return callmethod('__setattr__', (key, value))
    1131      def __delattr__(self, key):
    1132          if key[0] == '_':
    1133              return object.__delattr__(self, key)
    1134          callmethod = object.__getattribute__(self, '_callmethod')
    1135          return callmethod('__delattr__', (key,))
    1136  
    1137  
    1138  class ESC[4;38;5;81mValueProxy(ESC[4;38;5;149mBaseProxy):
    1139      _exposed_ = ('get', 'set')
    1140      def get(self):
    1141          return self._callmethod('get')
    1142      def set(self, value):
    1143          return self._callmethod('set', (value,))
    1144      value = property(get, set)
    1145  
    1146      __class_getitem__ = classmethod(types.GenericAlias)
    1147  
    1148  
    1149  BaseListProxy = MakeProxyType('BaseListProxy', (
    1150      '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
    1151      '__mul__', '__reversed__', '__rmul__', '__setitem__',
    1152      'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
    1153      'reverse', 'sort', '__imul__'
    1154      ))
    1155  class ESC[4;38;5;81mListProxy(ESC[4;38;5;149mBaseListProxy):
    1156      def __iadd__(self, value):
    1157          self._callmethod('extend', (value,))
    1158          return self
    1159      def __imul__(self, value):
    1160          self._callmethod('__imul__', (value,))
    1161          return self
    1162  
    1163  
    1164  DictProxy = MakeProxyType('DictProxy', (
    1165      '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
    1166      '__setitem__', 'clear', 'copy', 'get', 'items',
    1167      'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
    1168      ))
    1169  DictProxy._method_to_typeid_ = {
    1170      '__iter__': 'Iterator',
    1171      }
    1172  
    1173  
    1174  ArrayProxy = MakeProxyType('ArrayProxy', (
    1175      '__len__', '__getitem__', '__setitem__'
    1176      ))
    1177  
    1178  
    1179  BasePoolProxy = MakeProxyType('PoolProxy', (
    1180      'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
    1181      'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
    1182      ))
    1183  BasePoolProxy._method_to_typeid_ = {
    1184      'apply_async': 'AsyncResult',
    1185      'map_async': 'AsyncResult',
    1186      'starmap_async': 'AsyncResult',
    1187      'imap': 'Iterator',
    1188      'imap_unordered': 'Iterator'
    1189      }
    1190  class ESC[4;38;5;81mPoolProxy(ESC[4;38;5;149mBasePoolProxy):
    1191      def __enter__(self):
    1192          return self
    1193      def __exit__(self, exc_type, exc_val, exc_tb):
    1194          self.terminate()
    1195  
    1196  #
    1197  # Definition of SyncManager
    1198  #
    1199  
    1200  class ESC[4;38;5;81mSyncManager(ESC[4;38;5;149mBaseManager):
    1201      '''
    1202      Subclass of `BaseManager` which supports a number of shared object types.
    1203  
    1204      The types registered are those intended for the synchronization
    1205      of threads, plus `dict`, `list` and `Namespace`.
    1206  
    1207      The `multiprocessing.Manager()` function creates started instances of
    1208      this class.
    1209      '''
    1210  
    1211  SyncManager.register('Queue', queue.Queue)
    1212  SyncManager.register('JoinableQueue', queue.Queue)
    1213  SyncManager.register('Event', threading.Event, EventProxy)
    1214  SyncManager.register('Lock', threading.Lock, AcquirerProxy)
    1215  SyncManager.register('RLock', threading.RLock, AcquirerProxy)
    1216  SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
    1217  SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
    1218                       AcquirerProxy)
    1219  SyncManager.register('Condition', threading.Condition, ConditionProxy)
    1220  SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
    1221  SyncManager.register('Pool', pool.Pool, PoolProxy)
    1222  SyncManager.register('list', list, ListProxy)
    1223  SyncManager.register('dict', dict, DictProxy)
    1224  SyncManager.register('Value', Value, ValueProxy)
    1225  SyncManager.register('Array', Array, ArrayProxy)
    1226  SyncManager.register('Namespace', Namespace, NamespaceProxy)
    1227  
    1228  # types returned by methods of PoolProxy
    1229  SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
    1230  SyncManager.register('AsyncResult', create_method=False)
    1231  
    1232  #
    1233  # Definition of SharedMemoryManager and SharedMemoryServer
    1234  #
    1235  
    1236  if HAS_SHMEM:
    1237      class ESC[4;38;5;81m_SharedMemoryTracker:
    1238          "Manages one or more shared memory segments."
    1239  
    1240          def __init__(self, name, segment_names=[]):
    1241              self.shared_memory_context_name = name
    1242              self.segment_names = segment_names
    1243  
    1244          def register_segment(self, segment_name):
    1245              "Adds the supplied shared memory block name to tracker."
    1246              util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
    1247              self.segment_names.append(segment_name)
    1248  
    1249          def destroy_segment(self, segment_name):
    1250              """Calls unlink() on the shared memory block with the supplied name
    1251              and removes it from the list of blocks being tracked."""
    1252              util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
    1253              self.segment_names.remove(segment_name)
    1254              segment = shared_memory.SharedMemory(segment_name)
    1255              segment.close()
    1256              segment.unlink()
    1257  
    1258          def unlink(self):
    1259              "Calls destroy_segment() on all tracked shared memory blocks."
    1260              for segment_name in self.segment_names[:]:
    1261                  self.destroy_segment(segment_name)
    1262  
    1263          def __del__(self):
    1264              util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
    1265              self.unlink()
    1266  
    1267          def __getstate__(self):
    1268              return (self.shared_memory_context_name, self.segment_names)
    1269  
    1270          def __setstate__(self, state):
    1271              self.__init__(*state)
    1272  
    1273  
    1274      class ESC[4;38;5;81mSharedMemoryServer(ESC[4;38;5;149mServer):
    1275  
    1276          public = Server.public + \
    1277                   ['track_segment', 'release_segment', 'list_segments']
    1278  
    1279          def __init__(self, *args, **kwargs):
    1280              Server.__init__(self, *args, **kwargs)
    1281              address = self.address
    1282              # The address of Linux abstract namespaces can be bytes
    1283              if isinstance(address, bytes):
    1284                  address = os.fsdecode(address)
    1285              self.shared_memory_context = \
    1286                  _SharedMemoryTracker(f"shm_{address}_{getpid()}")
    1287              util.debug(f"SharedMemoryServer started by pid {getpid()}")
    1288  
    1289          def create(self, c, typeid, /, *args, **kwargs):
    1290              """Create a new distributed-shared object (not backed by a shared
    1291              memory block) and return its id to be used in a Proxy Object."""
    1292              # Unless set up as a shared proxy, don't make shared_memory_context
    1293              # a standard part of kwargs.  This makes things easier for supplying
    1294              # simple functions.
    1295              if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
    1296                  kwargs['shared_memory_context'] = self.shared_memory_context
    1297              return Server.create(self, c, typeid, *args, **kwargs)
    1298  
    1299          def shutdown(self, c):
    1300              "Call unlink() on all tracked shared memory, terminate the Server."
    1301              self.shared_memory_context.unlink()
    1302              return Server.shutdown(self, c)
    1303  
    1304          def track_segment(self, c, segment_name):
    1305              "Adds the supplied shared memory block name to Server's tracker."
    1306              self.shared_memory_context.register_segment(segment_name)
    1307  
    1308          def release_segment(self, c, segment_name):
    1309              """Calls unlink() on the shared memory block with the supplied name
    1310              and removes it from the tracker instance inside the Server."""
    1311              self.shared_memory_context.destroy_segment(segment_name)
    1312  
    1313          def list_segments(self, c):
    1314              """Returns a list of names of shared memory blocks that the Server
    1315              is currently tracking."""
    1316              return self.shared_memory_context.segment_names
    1317  
    1318  
    1319      class ESC[4;38;5;81mSharedMemoryManager(ESC[4;38;5;149mBaseManager):
    1320          """Like SyncManager but uses SharedMemoryServer instead of Server.
    1321  
    1322          It provides methods for creating and returning SharedMemory instances
    1323          and for creating a list-like object (ShareableList) backed by shared
    1324          memory.  It also provides methods that create and return Proxy Objects
    1325          that support synchronization across processes (i.e. multi-process-safe
    1326          locks and semaphores).
    1327          """
    1328  
    1329          _Server = SharedMemoryServer
    1330  
    1331          def __init__(self, *args, **kwargs):
    1332              if os.name == "posix":
    1333                  # bpo-36867: Ensure the resource_tracker is running before
    1334                  # launching the manager process, so that concurrent
    1335                  # shared_memory manipulation both in the manager and in the
    1336                  # current process does not create two resource_tracker
    1337                  # processes.
    1338                  from . import resource_tracker
    1339                  resource_tracker.ensure_running()
    1340              BaseManager.__init__(self, *args, **kwargs)
    1341              util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
    1342  
    1343          def __del__(self):
    1344              util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
    1345  
    1346          def get_server(self):
    1347              'Better than monkeypatching for now; merge into Server ultimately'
    1348              if self._state.value != State.INITIAL:
    1349                  if self._state.value == State.STARTED:
    1350                      raise ProcessError("Already started SharedMemoryServer")
    1351                  elif self._state.value == State.SHUTDOWN:
    1352                      raise ProcessError("SharedMemoryManager has shut down")
    1353                  else:
    1354                      raise ProcessError(
    1355                          "Unknown state {!r}".format(self._state.value))
    1356              return self._Server(self._registry, self._address,
    1357                                  self._authkey, self._serializer)
    1358  
    1359          def SharedMemory(self, size):
    1360              """Returns a new SharedMemory instance with the specified size in
    1361              bytes, to be tracked by the manager."""
    1362              with self._Client(self._address, authkey=self._authkey) as conn:
    1363                  sms = shared_memory.SharedMemory(None, create=True, size=size)
    1364                  try:
    1365                      dispatch(conn, None, 'track_segment', (sms.name,))
    1366                  except BaseException as e:
    1367                      sms.unlink()
    1368                      raise e
    1369              return sms
    1370  
    1371          def ShareableList(self, sequence):
    1372              """Returns a new ShareableList instance populated with the values
    1373              from the input sequence, to be tracked by the manager."""
    1374              with self._Client(self._address, authkey=self._authkey) as conn:
    1375                  sl = shared_memory.ShareableList(sequence)
    1376                  try:
    1377                      dispatch(conn, None, 'track_segment', (sl.shm.name,))
    1378                  except BaseException as e:
    1379                      sl.shm.unlink()
    1380                      raise e
    1381              return sl