python (3.11.7)
       1  import os
       2  import pathlib
       3  import tempfile
       4  import functools
       5  import contextlib
       6  import types
       7  import importlib
       8  
       9  from typing import Union, Optional
      10  from .abc import ResourceReader, Traversable
      11  
      12  from ._adapters import wrap_spec
      13  
      14  Package = Union[types.ModuleType, str]
      15  
      16  
      17  def files(package):
      18      # type: (Package) -> Traversable
      19      """
      20      Get a Traversable resource from a package
      21      """
      22      return from_package(get_package(package))
      23  
      24  
      25  def get_resource_reader(package):
      26      # type: (types.ModuleType) -> Optional[ResourceReader]
      27      """
      28      Return the package's loader if it's a ResourceReader.
      29      """
      30      # We can't use
      31      # a issubclass() check here because apparently abc.'s __subclasscheck__()
      32      # hook wants to create a weak reference to the object, but
      33      # zipimport.zipimporter does not support weak references, resulting in a
      34      # TypeError.  That seems terrible.
      35      spec = package.__spec__
      36      reader = getattr(spec.loader, 'get_resource_reader', None)  # type: ignore
      37      if reader is None:
      38          return None
      39      return reader(spec.name)  # type: ignore
      40  
      41  
      42  def resolve(cand):
      43      # type: (Package) -> types.ModuleType
      44      return cand if isinstance(cand, types.ModuleType) else importlib.import_module(cand)
      45  
      46  
      47  def get_package(package):
      48      # type: (Package) -> types.ModuleType
      49      """Take a package name or module object and return the module.
      50  
      51      Raise an exception if the resolved module is not a package.
      52      """
      53      resolved = resolve(package)
      54      if wrap_spec(resolved).submodule_search_locations is None:
      55          raise TypeError(f'{package!r} is not a package')
      56      return resolved
      57  
      58  
      59  def from_package(package):
      60      """
      61      Return a Traversable object for the given package.
      62  
      63      """
      64      spec = wrap_spec(package)
      65      reader = spec.loader.get_resource_reader(spec.name)
      66      return reader.files()
      67  
      68  
      69  @contextlib.contextmanager
      70  def _tempfile(reader, suffix='',
      71                # gh-93353: Keep a reference to call os.remove() in late Python
      72                # finalization.
      73                *, _os_remove=os.remove):
      74      # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
      75      # blocks due to the need to close the temporary file to work on Windows
      76      # properly.
      77      fd, raw_path = tempfile.mkstemp(suffix=suffix)
      78      try:
      79          try:
      80              os.write(fd, reader())
      81          finally:
      82              os.close(fd)
      83          del reader
      84          yield pathlib.Path(raw_path)
      85      finally:
      86          try:
      87              _os_remove(raw_path)
      88          except FileNotFoundError:
      89              pass
      90  
      91  
      92  @functools.singledispatch
      93  def as_file(path):
      94      """
      95      Given a Traversable object, return that object as a
      96      path on the local file system in a context manager.
      97      """
      98      return _tempfile(path.read_bytes, suffix=path.name)
      99  
     100  
     101  @as_file.register(pathlib.Path)
     102  @contextlib.contextmanager
     103  def _(path):
     104      """
     105      Degenerate behavior for pathlib.Path objects.
     106      """
     107      yield path