(root)/
Python-3.12.0/
Lib/
importlib/
resources/
readers.py
       1  import collections
       2  import itertools
       3  import pathlib
       4  import operator
       5  import zipfile
       6  
       7  from . import abc
       8  
       9  from ._itertools import only
      10  
      11  
      12  def remove_duplicates(items):
      13      return iter(collections.OrderedDict.fromkeys(items))
      14  
      15  
      16  class ESC[4;38;5;81mFileReader(ESC[4;38;5;149mabcESC[4;38;5;149m.ESC[4;38;5;149mTraversableResources):
      17      def __init__(self, loader):
      18          self.path = pathlib.Path(loader.path).parent
      19  
      20      def resource_path(self, resource):
      21          """
      22          Return the file system path to prevent
      23          `resources.path()` from creating a temporary
      24          copy.
      25          """
      26          return str(self.path.joinpath(resource))
      27  
      28      def files(self):
      29          return self.path
      30  
      31  
      32  class ESC[4;38;5;81mZipReader(ESC[4;38;5;149mabcESC[4;38;5;149m.ESC[4;38;5;149mTraversableResources):
      33      def __init__(self, loader, module):
      34          _, _, name = module.rpartition('.')
      35          self.prefix = loader.prefix.replace('\\', '/') + name + '/'
      36          self.archive = loader.archive
      37  
      38      def open_resource(self, resource):
      39          try:
      40              return super().open_resource(resource)
      41          except KeyError as exc:
      42              raise FileNotFoundError(exc.args[0])
      43  
      44      def is_resource(self, path):
      45          """
      46          Workaround for `zipfile.Path.is_file` returning true
      47          for non-existent paths.
      48          """
      49          target = self.files().joinpath(path)
      50          return target.is_file() and target.exists()
      51  
      52      def files(self):
      53          return zipfile.Path(self.archive, self.prefix)
      54  
      55  
      56  class ESC[4;38;5;81mMultiplexedPath(ESC[4;38;5;149mabcESC[4;38;5;149m.ESC[4;38;5;149mTraversable):
      57      """
      58      Given a series of Traversable objects, implement a merged
      59      version of the interface across all objects. Useful for
      60      namespace packages which may be multihomed at a single
      61      name.
      62      """
      63  
      64      def __init__(self, *paths):
      65          self._paths = list(map(pathlib.Path, remove_duplicates(paths)))
      66          if not self._paths:
      67              message = 'MultiplexedPath must contain at least one path'
      68              raise FileNotFoundError(message)
      69          if not all(path.is_dir() for path in self._paths):
      70              raise NotADirectoryError('MultiplexedPath only supports directories')
      71  
      72      def iterdir(self):
      73          children = (child for path in self._paths for child in path.iterdir())
      74          by_name = operator.attrgetter('name')
      75          groups = itertools.groupby(sorted(children, key=by_name), key=by_name)
      76          return map(self._follow, (locs for name, locs in groups))
      77  
      78      def read_bytes(self):
      79          raise FileNotFoundError(f'{self} is not a file')
      80  
      81      def read_text(self, *args, **kwargs):
      82          raise FileNotFoundError(f'{self} is not a file')
      83  
      84      def is_dir(self):
      85          return True
      86  
      87      def is_file(self):
      88          return False
      89  
      90      def joinpath(self, *descendants):
      91          try:
      92              return super().joinpath(*descendants)
      93          except abc.TraversalError:
      94              # One of the paths did not resolve (a directory does not exist).
      95              # Just return something that will not exist.
      96              return self._paths[0].joinpath(*descendants)
      97  
      98      @classmethod
      99      def _follow(cls, children):
     100          """
     101          Construct a MultiplexedPath if needed.
     102  
     103          If children contains a sole element, return it.
     104          Otherwise, return a MultiplexedPath of the items.
     105          Unless one of the items is not a Directory, then return the first.
     106          """
     107          subdirs, one_dir, one_file = itertools.tee(children, 3)
     108  
     109          try:
     110              return only(one_dir)
     111          except ValueError:
     112              try:
     113                  return cls(*subdirs)
     114              except NotADirectoryError:
     115                  return next(one_file)
     116  
     117      def open(self, *args, **kwargs):
     118          raise FileNotFoundError(f'{self} is not a file')
     119  
     120      @property
     121      def name(self):
     122          return self._paths[0].name
     123  
     124      def __repr__(self):
     125          paths = ', '.join(f"'{path}'" for path in self._paths)
     126          return f'MultiplexedPath({paths})'
     127  
     128  
     129  class ESC[4;38;5;81mNamespaceReader(ESC[4;38;5;149mabcESC[4;38;5;149m.ESC[4;38;5;149mTraversableResources):
     130      def __init__(self, namespace_path):
     131          if 'NamespacePath' not in str(namespace_path):
     132              raise ValueError('Invalid path')
     133          self.path = MultiplexedPath(*list(namespace_path))
     134  
     135      def resource_path(self, resource):
     136          """
     137          Return the file system path to prevent
     138          `resources.path()` from creating a temporary
     139          copy.
     140          """
     141          return str(self.path.joinpath(resource))
     142  
     143      def files(self):
     144          return self.path