]>
Commit | Line | Data |
---|---|---|
53e6db90 DC |
1 | """Utilities for extracting common archive formats""" |
2 | ||
3 | import zipfile | |
4 | import tarfile | |
5 | import os | |
6 | import shutil | |
7 | import posixpath | |
8 | import contextlib | |
9 | from distutils.errors import DistutilsError | |
10 | ||
11 | from ._path import ensure_directory | |
12 | ||
13 | __all__ = [ | |
14 | "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter", | |
15 | "UnrecognizedFormat", "extraction_drivers", "unpack_directory", | |
16 | ] | |
17 | ||
18 | ||
19 | class UnrecognizedFormat(DistutilsError): | |
20 | """Couldn't recognize the archive type""" | |
21 | ||
22 | ||
23 | def default_filter(src, dst): | |
24 | """The default progress/filter callback; returns True for all files""" | |
25 | return dst | |
26 | ||
27 | ||
28 | def unpack_archive( | |
29 | filename, extract_dir, progress_filter=default_filter, | |
30 | drivers=None): | |
31 | """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat`` | |
32 | ||
33 | `progress_filter` is a function taking two arguments: a source path | |
34 | internal to the archive ('/'-separated), and a filesystem path where it | |
35 | will be extracted. The callback must return the desired extract path | |
36 | (which may be the same as the one passed in), or else ``None`` to skip | |
37 | that file or directory. The callback can thus be used to report on the | |
38 | progress of the extraction, as well as to filter the items extracted or | |
39 | alter their extraction paths. | |
40 | ||
41 | `drivers`, if supplied, must be a non-empty sequence of functions with the | |
42 | same signature as this function (minus the `drivers` argument), that raise | |
43 | ``UnrecognizedFormat`` if they do not support extracting the designated | |
44 | archive type. The `drivers` are tried in sequence until one is found that | |
45 | does not raise an error, or until all are exhausted (in which case | |
46 | ``UnrecognizedFormat`` is raised). If you do not supply a sequence of | |
47 | drivers, the module's ``extraction_drivers`` constant will be used, which | |
48 | means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that | |
49 | order. | |
50 | """ | |
51 | for driver in drivers or extraction_drivers: | |
52 | try: | |
53 | driver(filename, extract_dir, progress_filter) | |
54 | except UnrecognizedFormat: | |
55 | continue | |
56 | else: | |
57 | return | |
58 | else: | |
59 | raise UnrecognizedFormat( | |
60 | "Not a recognized archive type: %s" % filename | |
61 | ) | |
62 | ||
63 | ||
64 | def unpack_directory(filename, extract_dir, progress_filter=default_filter): | |
65 | """"Unpack" a directory, using the same interface as for archives | |
66 | ||
67 | Raises ``UnrecognizedFormat`` if `filename` is not a directory | |
68 | """ | |
69 | if not os.path.isdir(filename): | |
70 | raise UnrecognizedFormat("%s is not a directory" % filename) | |
71 | ||
72 | paths = { | |
73 | filename: ('', extract_dir), | |
74 | } | |
75 | for base, dirs, files in os.walk(filename): | |
76 | src, dst = paths[base] | |
77 | for d in dirs: | |
78 | paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d) | |
79 | for f in files: | |
80 | target = os.path.join(dst, f) | |
81 | target = progress_filter(src + f, target) | |
82 | if not target: | |
83 | # skip non-files | |
84 | continue | |
85 | ensure_directory(target) | |
86 | f = os.path.join(base, f) | |
87 | shutil.copyfile(f, target) | |
88 | shutil.copystat(f, target) | |
89 | ||
90 | ||
91 | def unpack_zipfile(filename, extract_dir, progress_filter=default_filter): | |
92 | """Unpack zip `filename` to `extract_dir` | |
93 | ||
94 | Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined | |
95 | by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation | |
96 | of the `progress_filter` argument. | |
97 | """ | |
98 | ||
99 | if not zipfile.is_zipfile(filename): | |
100 | raise UnrecognizedFormat("%s is not a zip file" % (filename,)) | |
101 | ||
102 | with zipfile.ZipFile(filename) as z: | |
103 | _unpack_zipfile_obj(z, extract_dir, progress_filter) | |
104 | ||
105 | ||
106 | def _unpack_zipfile_obj(zipfile_obj, extract_dir, progress_filter=default_filter): | |
107 | """Internal/private API used by other parts of setuptools. | |
108 | Similar to ``unpack_zipfile``, but receives an already opened :obj:`zipfile.ZipFile` | |
109 | object instead of a filename. | |
110 | """ | |
111 | for info in zipfile_obj.infolist(): | |
112 | name = info.filename | |
113 | ||
114 | # don't extract absolute paths or ones with .. in them | |
115 | if name.startswith('/') or '..' in name.split('/'): | |
116 | continue | |
117 | ||
118 | target = os.path.join(extract_dir, *name.split('/')) | |
119 | target = progress_filter(name, target) | |
120 | if not target: | |
121 | continue | |
122 | if name.endswith('/'): | |
123 | # directory | |
124 | ensure_directory(target) | |
125 | else: | |
126 | # file | |
127 | ensure_directory(target) | |
128 | data = zipfile_obj.read(info.filename) | |
129 | with open(target, 'wb') as f: | |
130 | f.write(data) | |
131 | unix_attributes = info.external_attr >> 16 | |
132 | if unix_attributes: | |
133 | os.chmod(target, unix_attributes) | |
134 | ||
135 | ||
136 | def _resolve_tar_file_or_dir(tar_obj, tar_member_obj): | |
137 | """Resolve any links and extract link targets as normal files.""" | |
138 | while tar_member_obj is not None and ( | |
139 | tar_member_obj.islnk() or tar_member_obj.issym()): | |
140 | linkpath = tar_member_obj.linkname | |
141 | if tar_member_obj.issym(): | |
142 | base = posixpath.dirname(tar_member_obj.name) | |
143 | linkpath = posixpath.join(base, linkpath) | |
144 | linkpath = posixpath.normpath(linkpath) | |
145 | tar_member_obj = tar_obj._getmember(linkpath) | |
146 | ||
147 | is_file_or_dir = ( | |
148 | tar_member_obj is not None and | |
149 | (tar_member_obj.isfile() or tar_member_obj.isdir()) | |
150 | ) | |
151 | if is_file_or_dir: | |
152 | return tar_member_obj | |
153 | ||
154 | raise LookupError('Got unknown file type') | |
155 | ||
156 | ||
157 | def _iter_open_tar(tar_obj, extract_dir, progress_filter): | |
158 | """Emit member-destination pairs from a tar archive.""" | |
159 | # don't do any chowning! | |
160 | tar_obj.chown = lambda *args: None | |
161 | ||
162 | with contextlib.closing(tar_obj): | |
163 | for member in tar_obj: | |
164 | name = member.name | |
165 | # don't extract absolute paths or ones with .. in them | |
166 | if name.startswith('/') or '..' in name.split('/'): | |
167 | continue | |
168 | ||
169 | prelim_dst = os.path.join(extract_dir, *name.split('/')) | |
170 | ||
171 | try: | |
172 | member = _resolve_tar_file_or_dir(tar_obj, member) | |
173 | except LookupError: | |
174 | continue | |
175 | ||
176 | final_dst = progress_filter(name, prelim_dst) | |
177 | if not final_dst: | |
178 | continue | |
179 | ||
180 | if final_dst.endswith(os.sep): | |
181 | final_dst = final_dst[:-1] | |
182 | ||
183 | yield member, final_dst | |
184 | ||
185 | ||
186 | def unpack_tarfile(filename, extract_dir, progress_filter=default_filter): | |
187 | """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir` | |
188 | ||
189 | Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined | |
190 | by ``tarfile.open()``). See ``unpack_archive()`` for an explanation | |
191 | of the `progress_filter` argument. | |
192 | """ | |
193 | try: | |
194 | tarobj = tarfile.open(filename) | |
195 | except tarfile.TarError as e: | |
196 | raise UnrecognizedFormat( | |
197 | "%s is not a compressed or uncompressed tar file" % (filename,) | |
198 | ) from e | |
199 | ||
200 | for member, final_dst in _iter_open_tar( | |
201 | tarobj, extract_dir, progress_filter, | |
202 | ): | |
203 | try: | |
204 | # XXX Ugh | |
205 | tarobj._extract_member(member, final_dst) | |
206 | except tarfile.ExtractError: | |
207 | # chown/chmod/mkfifo/mknode/makedev failed | |
208 | pass | |
209 | ||
210 | return True | |
211 | ||
212 | ||
213 | extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile |