(root)/
Python-3.11.7/
Lib/
multiprocessing/
reduction.py
       1  #
       2  # Module which deals with pickling of objects.
       3  #
       4  # multiprocessing/reduction.py
       5  #
       6  # Copyright (c) 2006-2008, R Oudkerk
       7  # Licensed to PSF under a Contributor Agreement.
       8  #
       9  
      10  from abc import ABCMeta
      11  import copyreg
      12  import functools
      13  import io
      14  import os
      15  import pickle
      16  import socket
      17  import sys
      18  
      19  from . import context
      20  
      21  __all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
      22  
      23  
      24  HAVE_SEND_HANDLE = (sys.platform == 'win32' or
      25                      (hasattr(socket, 'CMSG_LEN') and
      26                       hasattr(socket, 'SCM_RIGHTS') and
      27                       hasattr(socket.socket, 'sendmsg')))
      28  
      29  #
      30  # Pickler subclass
      31  #
      32  
      33  class ESC[4;38;5;81mForkingPickler(ESC[4;38;5;149mpickleESC[4;38;5;149m.ESC[4;38;5;149mPickler):
      34      '''Pickler subclass used by multiprocessing.'''
      35      _extra_reducers = {}
      36      _copyreg_dispatch_table = copyreg.dispatch_table
      37  
      38      def __init__(self, *args):
      39          super().__init__(*args)
      40          self.dispatch_table = self._copyreg_dispatch_table.copy()
      41          self.dispatch_table.update(self._extra_reducers)
      42  
      43      @classmethod
      44      def register(cls, type, reduce):
      45          '''Register a reduce function for a type.'''
      46          cls._extra_reducers[type] = reduce
      47  
      48      @classmethod
      49      def dumps(cls, obj, protocol=None):
      50          buf = io.BytesIO()
      51          cls(buf, protocol).dump(obj)
      52          return buf.getbuffer()
      53  
      54      loads = pickle.loads
      55  
      56  register = ForkingPickler.register
      57  
      58  def dump(obj, file, protocol=None):
      59      '''Replacement for pickle.dump() using ForkingPickler.'''
      60      ForkingPickler(file, protocol).dump(obj)
      61  
      62  #
      63  # Platform specific definitions
      64  #
      65  
      66  if sys.platform == 'win32':
      67      # Windows
      68      __all__ += ['DupHandle', 'duplicate', 'steal_handle']
      69      import _winapi
      70  
      71      def duplicate(handle, target_process=None, inheritable=False,
      72                    *, source_process=None):
      73          '''Duplicate a handle.  (target_process is a handle not a pid!)'''
      74          current_process = _winapi.GetCurrentProcess()
      75          if source_process is None:
      76              source_process = current_process
      77          if target_process is None:
      78              target_process = current_process
      79          return _winapi.DuplicateHandle(
      80              source_process, handle, target_process,
      81              0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
      82  
      83      def steal_handle(source_pid, handle):
      84          '''Steal a handle from process identified by source_pid.'''
      85          source_process_handle = _winapi.OpenProcess(
      86              _winapi.PROCESS_DUP_HANDLE, False, source_pid)
      87          try:
      88              return _winapi.DuplicateHandle(
      89                  source_process_handle, handle,
      90                  _winapi.GetCurrentProcess(), 0, False,
      91                  _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
      92          finally:
      93              _winapi.CloseHandle(source_process_handle)
      94  
      95      def send_handle(conn, handle, destination_pid):
      96          '''Send a handle over a local connection.'''
      97          dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
      98          conn.send(dh)
      99  
     100      def recv_handle(conn):
     101          '''Receive a handle over a local connection.'''
     102          return conn.recv().detach()
     103  
     104      class ESC[4;38;5;81mDupHandle(ESC[4;38;5;149mobject):
     105          '''Picklable wrapper for a handle.'''
     106          def __init__(self, handle, access, pid=None):
     107              if pid is None:
     108                  # We just duplicate the handle in the current process and
     109                  # let the receiving process steal the handle.
     110                  pid = os.getpid()
     111              proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
     112              try:
     113                  self._handle = _winapi.DuplicateHandle(
     114                      _winapi.GetCurrentProcess(),
     115                      handle, proc, access, False, 0)
     116              finally:
     117                  _winapi.CloseHandle(proc)
     118              self._access = access
     119              self._pid = pid
     120  
     121          def detach(self):
     122              '''Get the handle.  This should only be called once.'''
     123              # retrieve handle from process which currently owns it
     124              if self._pid == os.getpid():
     125                  # The handle has already been duplicated for this process.
     126                  return self._handle
     127              # We must steal the handle from the process whose pid is self._pid.
     128              proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
     129                                         self._pid)
     130              try:
     131                  return _winapi.DuplicateHandle(
     132                      proc, self._handle, _winapi.GetCurrentProcess(),
     133                      self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
     134              finally:
     135                  _winapi.CloseHandle(proc)
     136  
     137  else:
     138      # Unix
     139      __all__ += ['DupFd', 'sendfds', 'recvfds']
     140      import array
     141  
     142      # On MacOSX we should acknowledge receipt of fds -- see Issue14669
     143      ACKNOWLEDGE = sys.platform == 'darwin'
     144  
     145      def sendfds(sock, fds):
     146          '''Send an array of fds over an AF_UNIX socket.'''
     147          fds = array.array('i', fds)
     148          msg = bytes([len(fds) % 256])
     149          sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
     150          if ACKNOWLEDGE and sock.recv(1) != b'A':
     151              raise RuntimeError('did not receive acknowledgement of fd')
     152  
     153      def recvfds(sock, size):
     154          '''Receive an array of fds over an AF_UNIX socket.'''
     155          a = array.array('i')
     156          bytes_size = a.itemsize * size
     157          msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size))
     158          if not msg and not ancdata:
     159              raise EOFError
     160          try:
     161              if ACKNOWLEDGE:
     162                  sock.send(b'A')
     163              if len(ancdata) != 1:
     164                  raise RuntimeError('received %d items of ancdata' %
     165                                     len(ancdata))
     166              cmsg_level, cmsg_type, cmsg_data = ancdata[0]
     167              if (cmsg_level == socket.SOL_SOCKET and
     168                  cmsg_type == socket.SCM_RIGHTS):
     169                  if len(cmsg_data) % a.itemsize != 0:
     170                      raise ValueError
     171                  a.frombytes(cmsg_data)
     172                  if len(a) % 256 != msg[0]:
     173                      raise AssertionError(
     174                          "Len is {0:n} but msg[0] is {1!r}".format(
     175                              len(a), msg[0]))
     176                  return list(a)
     177          except (ValueError, IndexError):
     178              pass
     179          raise RuntimeError('Invalid data received')
     180  
     181      def send_handle(conn, handle, destination_pid):
     182          '''Send a handle over a local connection.'''
     183          with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
     184              sendfds(s, [handle])
     185  
     186      def recv_handle(conn):
     187          '''Receive a handle over a local connection.'''
     188          with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
     189              return recvfds(s, 1)[0]
     190  
     191      def DupFd(fd):
     192          '''Return a wrapper for an fd.'''
     193          popen_obj = context.get_spawning_popen()
     194          if popen_obj is not None:
     195              return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
     196          elif HAVE_SEND_HANDLE:
     197              from . import resource_sharer
     198              return resource_sharer.DupFd(fd)
     199          else:
     200              raise ValueError('SCM_RIGHTS appears not to be available')
     201  
     202  #
     203  # Try making some callable types picklable
     204  #
     205  
     206  def _reduce_method(m):
     207      if m.__self__ is None:
     208          return getattr, (m.__class__, m.__func__.__name__)
     209      else:
     210          return getattr, (m.__self__, m.__func__.__name__)
     211  class ESC[4;38;5;81m_C:
     212      def f(self):
     213          pass
     214  register(type(_C().f), _reduce_method)
     215  
     216  
     217  def _reduce_method_descriptor(m):
     218      return getattr, (m.__objclass__, m.__name__)
     219  register(type(list.append), _reduce_method_descriptor)
     220  register(type(int.__add__), _reduce_method_descriptor)
     221  
     222  
     223  def _reduce_partial(p):
     224      return _rebuild_partial, (p.func, p.args, p.keywords or {})
     225  def _rebuild_partial(func, args, keywords):
     226      return functools.partial(func, *args, **keywords)
     227  register(functools.partial, _reduce_partial)
     228  
     229  #
     230  # Make sockets picklable
     231  #
     232  
     233  if sys.platform == 'win32':
     234      def _reduce_socket(s):
     235          from .resource_sharer import DupSocket
     236          return _rebuild_socket, (DupSocket(s),)
     237      def _rebuild_socket(ds):
     238          return ds.detach()
     239      register(socket.socket, _reduce_socket)
     240  
     241  else:
     242      def _reduce_socket(s):
     243          df = DupFd(s.fileno())
     244          return _rebuild_socket, (df, s.family, s.type, s.proto)
     245      def _rebuild_socket(df, family, type, proto):
     246          fd = df.detach()
     247          return socket.socket(family, type, proto, fileno=fd)
     248      register(socket.socket, _reduce_socket)
     249  
     250  
     251  class ESC[4;38;5;81mAbstractReducer(metaclass=ESC[4;38;5;149mABCMeta):
     252      '''Abstract base class for use in implementing a Reduction class
     253      suitable for use in replacing the standard reduction mechanism
     254      used in multiprocessing.'''
     255      ForkingPickler = ForkingPickler
     256      register = register
     257      dump = dump
     258      send_handle = send_handle
     259      recv_handle = recv_handle
     260  
     261      if sys.platform == 'win32':
     262          steal_handle = steal_handle
     263          duplicate = duplicate
     264          DupHandle = DupHandle
     265      else:
     266          sendfds = sendfds
     267          recvfds = recvfds
     268          DupFd = DupFd
     269  
     270      _reduce_method = _reduce_method
     271      _reduce_method_descriptor = _reduce_method_descriptor
     272      _rebuild_partial = _rebuild_partial
     273      _reduce_socket = _reduce_socket
     274      _rebuild_socket = _rebuild_socket
     275  
     276      def __init__(self, *args):
     277          register(type(_C().f), _reduce_method)
     278          register(type(list.append), _reduce_method_descriptor)
     279          register(type(int.__add__), _reduce_method_descriptor)
     280          register(functools.partial, _reduce_partial)
     281          register(socket.socket, _reduce_socket)