1 import collections
2 import itertools
3 import pathlib
4 import operator
5 import zipfile
6
7 from . import abc
8
9 from ._itertools import only
10
11
12 def remove_duplicates(items):
13 return iter(collections.OrderedDict.fromkeys(items))
14
15
16 class ESC[4;38;5;81mFileReader(ESC[4;38;5;149mabcESC[4;38;5;149m.ESC[4;38;5;149mTraversableResources):
17 def __init__(self, loader):
18 self.path = pathlib.Path(loader.path).parent
19
20 def resource_path(self, resource):
21 """
22 Return the file system path to prevent
23 `resources.path()` from creating a temporary
24 copy.
25 """
26 return str(self.path.joinpath(resource))
27
28 def files(self):
29 return self.path
30
31
32 class ESC[4;38;5;81mZipReader(ESC[4;38;5;149mabcESC[4;38;5;149m.ESC[4;38;5;149mTraversableResources):
33 def __init__(self, loader, module):
34 _, _, name = module.rpartition('.')
35 self.prefix = loader.prefix.replace('\\', '/') + name + '/'
36 self.archive = loader.archive
37
38 def open_resource(self, resource):
39 try:
40 return super().open_resource(resource)
41 except KeyError as exc:
42 raise FileNotFoundError(exc.args[0])
43
44 def is_resource(self, path):
45 """
46 Workaround for `zipfile.Path.is_file` returning true
47 for non-existent paths.
48 """
49 target = self.files().joinpath(path)
50 return target.is_file() and target.exists()
51
52 def files(self):
53 return zipfile.Path(self.archive, self.prefix)
54
55
56 class ESC[4;38;5;81mMultiplexedPath(ESC[4;38;5;149mabcESC[4;38;5;149m.ESC[4;38;5;149mTraversable):
57 """
58 Given a series of Traversable objects, implement a merged
59 version of the interface across all objects. Useful for
60 namespace packages which may be multihomed at a single
61 name.
62 """
63
64 def __init__(self, *paths):
65 self._paths = list(map(pathlib.Path, remove_duplicates(paths)))
66 if not self._paths:
67 message = 'MultiplexedPath must contain at least one path'
68 raise FileNotFoundError(message)
69 if not all(path.is_dir() for path in self._paths):
70 raise NotADirectoryError('MultiplexedPath only supports directories')
71
72 def iterdir(self):
73 children = (child for path in self._paths for child in path.iterdir())
74 by_name = operator.attrgetter('name')
75 groups = itertools.groupby(sorted(children, key=by_name), key=by_name)
76 return map(self._follow, (locs for name, locs in groups))
77
78 def read_bytes(self):
79 raise FileNotFoundError(f'{self} is not a file')
80
81 def read_text(self, *args, **kwargs):
82 raise FileNotFoundError(f'{self} is not a file')
83
84 def is_dir(self):
85 return True
86
87 def is_file(self):
88 return False
89
90 def joinpath(self, *descendants):
91 try:
92 return super().joinpath(*descendants)
93 except abc.TraversalError:
94 # One of the paths did not resolve (a directory does not exist).
95 # Just return something that will not exist.
96 return self._paths[0].joinpath(*descendants)
97
98 @classmethod
99 def _follow(cls, children):
100 """
101 Construct a MultiplexedPath if needed.
102
103 If children contains a sole element, return it.
104 Otherwise, return a MultiplexedPath of the items.
105 Unless one of the items is not a Directory, then return the first.
106 """
107 subdirs, one_dir, one_file = itertools.tee(children, 3)
108
109 try:
110 return only(one_dir)
111 except ValueError:
112 try:
113 return cls(*subdirs)
114 except NotADirectoryError:
115 return next(one_file)
116
117 def open(self, *args, **kwargs):
118 raise FileNotFoundError(f'{self} is not a file')
119
120 @property
121 def name(self):
122 return self._paths[0].name
123
124 def __repr__(self):
125 paths = ', '.join(f"'{path}'" for path in self._paths)
126 return f'MultiplexedPath({paths})'
127
128
129 class ESC[4;38;5;81mNamespaceReader(ESC[4;38;5;149mabcESC[4;38;5;149m.ESC[4;38;5;149mTraversableResources):
130 def __init__(self, namespace_path):
131 if 'NamespacePath' not in str(namespace_path):
132 raise ValueError('Invalid path')
133 self.path = MultiplexedPath(*list(namespace_path))
134
135 def resource_path(self, resource):
136 """
137 Return the file system path to prevent
138 `resources.path()` from creating a temporary
139 copy.
140 """
141 return str(self.path.joinpath(resource))
142
143 def files(self):
144 return self.path