]>
Commit | Line | Data |
---|---|---|
53e6db90 DC |
1 | import io |
2 | import posixpath | |
3 | import zipfile | |
4 | import itertools | |
5 | import contextlib | |
6 | import pathlib | |
7 | import re | |
8 | ||
9 | from .py310compat import text_encoding | |
10 | from .glob import translate | |
11 | ||
12 | ||
13 | __all__ = ['Path'] | |
14 | ||
15 | ||
16 | def _parents(path): | |
17 | """ | |
18 | Given a path with elements separated by | |
19 | posixpath.sep, generate all parents of that path. | |
20 | ||
21 | >>> list(_parents('b/d')) | |
22 | ['b'] | |
23 | >>> list(_parents('/b/d/')) | |
24 | ['/b'] | |
25 | >>> list(_parents('b/d/f/')) | |
26 | ['b/d', 'b'] | |
27 | >>> list(_parents('b')) | |
28 | [] | |
29 | >>> list(_parents('')) | |
30 | [] | |
31 | """ | |
32 | return itertools.islice(_ancestry(path), 1, None) | |
33 | ||
34 | ||
35 | def _ancestry(path): | |
36 | """ | |
37 | Given a path with elements separated by | |
38 | posixpath.sep, generate all elements of that path | |
39 | ||
40 | >>> list(_ancestry('b/d')) | |
41 | ['b/d', 'b'] | |
42 | >>> list(_ancestry('/b/d/')) | |
43 | ['/b/d', '/b'] | |
44 | >>> list(_ancestry('b/d/f/')) | |
45 | ['b/d/f', 'b/d', 'b'] | |
46 | >>> list(_ancestry('b')) | |
47 | ['b'] | |
48 | >>> list(_ancestry('')) | |
49 | [] | |
50 | """ | |
51 | path = path.rstrip(posixpath.sep) | |
52 | while path and path != posixpath.sep: | |
53 | yield path | |
54 | path, tail = posixpath.split(path) | |
55 | ||
56 | ||
57 | _dedupe = dict.fromkeys | |
58 | """Deduplicate an iterable in original order""" | |
59 | ||
60 | ||
61 | def _difference(minuend, subtrahend): | |
62 | """ | |
63 | Return items in minuend not in subtrahend, retaining order | |
64 | with O(1) lookup. | |
65 | """ | |
66 | return itertools.filterfalse(set(subtrahend).__contains__, minuend) | |
67 | ||
68 | ||
69 | class InitializedState: | |
70 | """ | |
71 | Mix-in to save the initialization state for pickling. | |
72 | """ | |
73 | ||
74 | def __init__(self, *args, **kwargs): | |
75 | self.__args = args | |
76 | self.__kwargs = kwargs | |
77 | super().__init__(*args, **kwargs) | |
78 | ||
79 | def __getstate__(self): | |
80 | return self.__args, self.__kwargs | |
81 | ||
82 | def __setstate__(self, state): | |
83 | args, kwargs = state | |
84 | super().__init__(*args, **kwargs) | |
85 | ||
86 | ||
87 | class CompleteDirs(InitializedState, zipfile.ZipFile): | |
88 | """ | |
89 | A ZipFile subclass that ensures that implied directories | |
90 | are always included in the namelist. | |
91 | ||
92 | >>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt'])) | |
93 | ['foo/', 'foo/bar/'] | |
94 | >>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt', 'foo/bar/'])) | |
95 | ['foo/'] | |
96 | """ | |
97 | ||
98 | @staticmethod | |
99 | def _implied_dirs(names): | |
100 | parents = itertools.chain.from_iterable(map(_parents, names)) | |
101 | as_dirs = (p + posixpath.sep for p in parents) | |
102 | return _dedupe(_difference(as_dirs, names)) | |
103 | ||
104 | def namelist(self): | |
105 | names = super().namelist() | |
106 | return names + list(self._implied_dirs(names)) | |
107 | ||
108 | def _name_set(self): | |
109 | return set(self.namelist()) | |
110 | ||
111 | def resolve_dir(self, name): | |
112 | """ | |
113 | If the name represents a directory, return that name | |
114 | as a directory (with the trailing slash). | |
115 | """ | |
116 | names = self._name_set() | |
117 | dirname = name + '/' | |
118 | dir_match = name not in names and dirname in names | |
119 | return dirname if dir_match else name | |
120 | ||
121 | def getinfo(self, name): | |
122 | """ | |
123 | Supplement getinfo for implied dirs. | |
124 | """ | |
125 | try: | |
126 | return super().getinfo(name) | |
127 | except KeyError: | |
128 | if not name.endswith('/') or name not in self._name_set(): | |
129 | raise | |
130 | return zipfile.ZipInfo(filename=name) | |
131 | ||
132 | @classmethod | |
133 | def make(cls, source): | |
134 | """ | |
135 | Given a source (filename or zipfile), return an | |
136 | appropriate CompleteDirs subclass. | |
137 | """ | |
138 | if isinstance(source, CompleteDirs): | |
139 | return source | |
140 | ||
141 | if not isinstance(source, zipfile.ZipFile): | |
142 | return cls(source) | |
143 | ||
144 | # Only allow for FastLookup when supplied zipfile is read-only | |
145 | if 'r' not in source.mode: | |
146 | cls = CompleteDirs | |
147 | ||
148 | source.__class__ = cls | |
149 | return source | |
150 | ||
151 | @classmethod | |
152 | def inject(cls, zf: zipfile.ZipFile) -> zipfile.ZipFile: | |
153 | """ | |
154 | Given a writable zip file zf, inject directory entries for | |
155 | any directories implied by the presence of children. | |
156 | """ | |
157 | for name in cls._implied_dirs(zf.namelist()): | |
158 | zf.writestr(name, b"") | |
159 | return zf | |
160 | ||
161 | ||
162 | class FastLookup(CompleteDirs): | |
163 | """ | |
164 | ZipFile subclass to ensure implicit | |
165 | dirs exist and are resolved rapidly. | |
166 | """ | |
167 | ||
168 | def namelist(self): | |
169 | with contextlib.suppress(AttributeError): | |
170 | return self.__names | |
171 | self.__names = super().namelist() | |
172 | return self.__names | |
173 | ||
174 | def _name_set(self): | |
175 | with contextlib.suppress(AttributeError): | |
176 | return self.__lookup | |
177 | self.__lookup = super()._name_set() | |
178 | return self.__lookup | |
179 | ||
180 | ||
181 | def _extract_text_encoding(encoding=None, *args, **kwargs): | |
182 | # stacklevel=3 so that the caller of the caller see any warning. | |
183 | return text_encoding(encoding, 3), args, kwargs | |
184 | ||
185 | ||
186 | class Path: | |
187 | """ | |
188 | A pathlib-compatible interface for zip files. | |
189 | ||
190 | Consider a zip file with this structure:: | |
191 | ||
192 | . | |
193 | ├── a.txt | |
194 | └── b | |
195 | ├── c.txt | |
196 | └── d | |
197 | └── e.txt | |
198 | ||
199 | >>> data = io.BytesIO() | |
200 | >>> zf = zipfile.ZipFile(data, 'w') | |
201 | >>> zf.writestr('a.txt', 'content of a') | |
202 | >>> zf.writestr('b/c.txt', 'content of c') | |
203 | >>> zf.writestr('b/d/e.txt', 'content of e') | |
204 | >>> zf.filename = 'mem/abcde.zip' | |
205 | ||
206 | Path accepts the zipfile object itself or a filename | |
207 | ||
208 | >>> path = Path(zf) | |
209 | ||
210 | From there, several path operations are available. | |
211 | ||
212 | Directory iteration (including the zip file itself): | |
213 | ||
214 | >>> a, b = path.iterdir() | |
215 | >>> a | |
216 | Path('mem/abcde.zip', 'a.txt') | |
217 | >>> b | |
218 | Path('mem/abcde.zip', 'b/') | |
219 | ||
220 | name property: | |
221 | ||
222 | >>> b.name | |
223 | 'b' | |
224 | ||
225 | join with divide operator: | |
226 | ||
227 | >>> c = b / 'c.txt' | |
228 | >>> c | |
229 | Path('mem/abcde.zip', 'b/c.txt') | |
230 | >>> c.name | |
231 | 'c.txt' | |
232 | ||
233 | Read text: | |
234 | ||
235 | >>> c.read_text(encoding='utf-8') | |
236 | 'content of c' | |
237 | ||
238 | existence: | |
239 | ||
240 | >>> c.exists() | |
241 | True | |
242 | >>> (b / 'missing.txt').exists() | |
243 | False | |
244 | ||
245 | Coercion to string: | |
246 | ||
247 | >>> import os | |
248 | >>> str(c).replace(os.sep, posixpath.sep) | |
249 | 'mem/abcde.zip/b/c.txt' | |
250 | ||
251 | At the root, ``name``, ``filename``, and ``parent`` | |
252 | resolve to the zipfile. | |
253 | ||
254 | >>> str(path) | |
255 | 'mem/abcde.zip/' | |
256 | >>> path.name | |
257 | 'abcde.zip' | |
258 | >>> path.filename == pathlib.Path('mem/abcde.zip') | |
259 | True | |
260 | >>> str(path.parent) | |
261 | 'mem' | |
262 | ||
263 | If the zipfile has no filename, such attribtues are not | |
264 | valid and accessing them will raise an Exception. | |
265 | ||
266 | >>> zf.filename = None | |
267 | >>> path.name | |
268 | Traceback (most recent call last): | |
269 | ... | |
270 | TypeError: ... | |
271 | ||
272 | >>> path.filename | |
273 | Traceback (most recent call last): | |
274 | ... | |
275 | TypeError: ... | |
276 | ||
277 | >>> path.parent | |
278 | Traceback (most recent call last): | |
279 | ... | |
280 | TypeError: ... | |
281 | ||
282 | # workaround python/cpython#106763 | |
283 | >>> pass | |
284 | """ | |
285 | ||
286 | __repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})" | |
287 | ||
288 | def __init__(self, root, at=""): | |
289 | """ | |
290 | Construct a Path from a ZipFile or filename. | |
291 | ||
292 | Note: When the source is an existing ZipFile object, | |
293 | its type (__class__) will be mutated to a | |
294 | specialized type. If the caller wishes to retain the | |
295 | original type, the caller should either create a | |
296 | separate ZipFile object or pass a filename. | |
297 | """ | |
298 | self.root = FastLookup.make(root) | |
299 | self.at = at | |
300 | ||
301 | def __eq__(self, other): | |
302 | """ | |
303 | >>> Path(zipfile.ZipFile(io.BytesIO(), 'w')) == 'foo' | |
304 | False | |
305 | """ | |
306 | if self.__class__ is not other.__class__: | |
307 | return NotImplemented | |
308 | return (self.root, self.at) == (other.root, other.at) | |
309 | ||
310 | def __hash__(self): | |
311 | return hash((self.root, self.at)) | |
312 | ||
313 | def open(self, mode='r', *args, pwd=None, **kwargs): | |
314 | """ | |
315 | Open this entry as text or binary following the semantics | |
316 | of ``pathlib.Path.open()`` by passing arguments through | |
317 | to io.TextIOWrapper(). | |
318 | """ | |
319 | if self.is_dir(): | |
320 | raise IsADirectoryError(self) | |
321 | zip_mode = mode[0] | |
322 | if not self.exists() and zip_mode == 'r': | |
323 | raise FileNotFoundError(self) | |
324 | stream = self.root.open(self.at, zip_mode, pwd=pwd) | |
325 | if 'b' in mode: | |
326 | if args or kwargs: | |
327 | raise ValueError("encoding args invalid for binary operation") | |
328 | return stream | |
329 | # Text mode: | |
330 | encoding, args, kwargs = _extract_text_encoding(*args, **kwargs) | |
331 | return io.TextIOWrapper(stream, encoding, *args, **kwargs) | |
332 | ||
333 | def _base(self): | |
334 | return pathlib.PurePosixPath(self.at or self.root.filename) | |
335 | ||
336 | @property | |
337 | def name(self): | |
338 | return self._base().name | |
339 | ||
340 | @property | |
341 | def suffix(self): | |
342 | return self._base().suffix | |
343 | ||
344 | @property | |
345 | def suffixes(self): | |
346 | return self._base().suffixes | |
347 | ||
348 | @property | |
349 | def stem(self): | |
350 | return self._base().stem | |
351 | ||
352 | @property | |
353 | def filename(self): | |
354 | return pathlib.Path(self.root.filename).joinpath(self.at) | |
355 | ||
356 | def read_text(self, *args, **kwargs): | |
357 | encoding, args, kwargs = _extract_text_encoding(*args, **kwargs) | |
358 | with self.open('r', encoding, *args, **kwargs) as strm: | |
359 | return strm.read() | |
360 | ||
361 | def read_bytes(self): | |
362 | with self.open('rb') as strm: | |
363 | return strm.read() | |
364 | ||
365 | def _is_child(self, path): | |
366 | return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/") | |
367 | ||
368 | def _next(self, at): | |
369 | return self.__class__(self.root, at) | |
370 | ||
371 | def is_dir(self): | |
372 | return not self.at or self.at.endswith("/") | |
373 | ||
374 | def is_file(self): | |
375 | return self.exists() and not self.is_dir() | |
376 | ||
377 | def exists(self): | |
378 | return self.at in self.root._name_set() | |
379 | ||
380 | def iterdir(self): | |
381 | if not self.is_dir(): | |
382 | raise ValueError("Can't listdir a file") | |
383 | subs = map(self._next, self.root.namelist()) | |
384 | return filter(self._is_child, subs) | |
385 | ||
386 | def match(self, path_pattern): | |
387 | return pathlib.PurePosixPath(self.at).match(path_pattern) | |
388 | ||
389 | def is_symlink(self): | |
390 | """ | |
391 | Return whether this path is a symlink. Always false (python/cpython#82102). | |
392 | """ | |
393 | return False | |
394 | ||
395 | def glob(self, pattern): | |
396 | if not pattern: | |
397 | raise ValueError(f"Unacceptable pattern: {pattern!r}") | |
398 | ||
399 | prefix = re.escape(self.at) | |
400 | matches = re.compile(prefix + translate(pattern)).fullmatch | |
401 | return map(self._next, filter(matches, self.root.namelist())) | |
402 | ||
403 | def rglob(self, pattern): | |
404 | return self.glob(f'**/{pattern}') | |
405 | ||
406 | def relative_to(self, other, *extra): | |
407 | return posixpath.relpath(str(self), str(other.joinpath(*extra))) | |
408 | ||
409 | def __str__(self): | |
410 | return posixpath.join(self.root.filename, self.at) | |
411 | ||
412 | def __repr__(self): | |
413 | return self.__repr.format(self=self) | |
414 | ||
415 | def joinpath(self, *other): | |
416 | next = posixpath.join(self.at, *other) | |
417 | return self._next(self.root.resolve_dir(next)) | |
418 | ||
419 | __truediv__ = joinpath | |
420 | ||
421 | @property | |
422 | def parent(self): | |
423 | if not self.at: | |
424 | return self.filename.parent | |
425 | parent_at = posixpath.dirname(self.at.rstrip('/')) | |
426 | if parent_at: | |
427 | parent_at += '/' | |
428 | return self._next(parent_at) |