1 """Interface to the liblzma compression library.
2
3 This module provides a class for reading and writing compressed files,
4 classes for incremental (de)compression, and convenience functions for
5 one-shot (de)compression.
6
7 These classes and functions support both the XZ and legacy LZMA
8 container formats, as well as raw compressed data streams.
9 """
10
11 __all__ = [
12 "CHECK_NONE", "CHECK_CRC32", "CHECK_CRC64", "CHECK_SHA256",
13 "CHECK_ID_MAX", "CHECK_UNKNOWN",
14 "FILTER_LZMA1", "FILTER_LZMA2", "FILTER_DELTA", "FILTER_X86", "FILTER_IA64",
15 "FILTER_ARM", "FILTER_ARMTHUMB", "FILTER_POWERPC", "FILTER_SPARC",
16 "FORMAT_AUTO", "FORMAT_XZ", "FORMAT_ALONE", "FORMAT_RAW",
17 "MF_HC3", "MF_HC4", "MF_BT2", "MF_BT3", "MF_BT4",
18 "MODE_FAST", "MODE_NORMAL", "PRESET_DEFAULT", "PRESET_EXTREME",
19
20 "LZMACompressor", "LZMADecompressor", "LZMAFile", "LZMAError",
21 "open", "compress", "decompress", "is_check_supported",
22 ]
23
24 import builtins
25 import io
26 import os
27 from _lzma import *
28 from _lzma import _encode_filter_properties, _decode_filter_properties
29 import _compression
30
31
32 _MODE_CLOSED = 0
33 _MODE_READ = 1
34 # Value 2 no longer used
35 _MODE_WRITE = 3
36
37
38 class ESC[4;38;5;81mLZMAFile(ESC[4;38;5;149m_compressionESC[4;38;5;149m.ESC[4;38;5;149mBaseStream):
39
40 """A file object providing transparent LZMA (de)compression.
41
42 An LZMAFile can act as a wrapper for an existing file object, or
43 refer directly to a named file on disk.
44
45 Note that LZMAFile provides a *binary* file interface - data read
46 is returned as bytes, and data to be written must be given as bytes.
47 """
48
49 def __init__(self, filename=None, mode="r", *,
50 format=None, check=-1, preset=None, filters=None):
51 """Open an LZMA-compressed file in binary mode.
52
53 filename can be either an actual file name (given as a str,
54 bytes, or PathLike object), in which case the named file is
55 opened, or it can be an existing file object to read from or
56 write to.
57
58 mode can be "r" for reading (default), "w" for (over)writing,
59 "x" for creating exclusively, or "a" for appending. These can
60 equivalently be given as "rb", "wb", "xb" and "ab" respectively.
61
62 format specifies the container format to use for the file.
63 If mode is "r", this defaults to FORMAT_AUTO. Otherwise, the
64 default is FORMAT_XZ.
65
66 check specifies the integrity check to use. This argument can
67 only be used when opening a file for writing. For FORMAT_XZ,
68 the default is CHECK_CRC64. FORMAT_ALONE and FORMAT_RAW do not
69 support integrity checks - for these formats, check must be
70 omitted, or be CHECK_NONE.
71
72 When opening a file for reading, the *preset* argument is not
73 meaningful, and should be omitted. The *filters* argument should
74 also be omitted, except when format is FORMAT_RAW (in which case
75 it is required).
76
77 When opening a file for writing, the settings used by the
78 compressor can be specified either as a preset compression
79 level (with the *preset* argument), or in detail as a custom
80 filter chain (with the *filters* argument). For FORMAT_XZ and
81 FORMAT_ALONE, the default is to use the PRESET_DEFAULT preset
82 level. For FORMAT_RAW, the caller must always specify a filter
83 chain; the raw compressor does not support preset compression
84 levels.
85
86 preset (if provided) should be an integer in the range 0-9,
87 optionally OR-ed with the constant PRESET_EXTREME.
88
89 filters (if provided) should be a sequence of dicts. Each dict
90 should have an entry for "id" indicating ID of the filter, plus
91 additional entries for options to the filter.
92 """
93 self._fp = None
94 self._closefp = False
95 self._mode = _MODE_CLOSED
96
97 if mode in ("r", "rb"):
98 if check != -1:
99 raise ValueError("Cannot specify an integrity check "
100 "when opening a file for reading")
101 if preset is not None:
102 raise ValueError("Cannot specify a preset compression "
103 "level when opening a file for reading")
104 if format is None:
105 format = FORMAT_AUTO
106 mode_code = _MODE_READ
107 elif mode in ("w", "wb", "a", "ab", "x", "xb"):
108 if format is None:
109 format = FORMAT_XZ
110 mode_code = _MODE_WRITE
111 self._compressor = LZMACompressor(format=format, check=check,
112 preset=preset, filters=filters)
113 self._pos = 0
114 else:
115 raise ValueError("Invalid mode: {!r}".format(mode))
116
117 if isinstance(filename, (str, bytes, os.PathLike)):
118 if "b" not in mode:
119 mode += "b"
120 self._fp = builtins.open(filename, mode)
121 self._closefp = True
122 self._mode = mode_code
123 elif hasattr(filename, "read") or hasattr(filename, "write"):
124 self._fp = filename
125 self._mode = mode_code
126 else:
127 raise TypeError("filename must be a str, bytes, file or PathLike object")
128
129 if self._mode == _MODE_READ:
130 raw = _compression.DecompressReader(self._fp, LZMADecompressor,
131 trailing_error=LZMAError, format=format, filters=filters)
132 self._buffer = io.BufferedReader(raw)
133
134 def close(self):
135 """Flush and close the file.
136
137 May be called more than once without error. Once the file is
138 closed, any other operation on it will raise a ValueError.
139 """
140 if self._mode == _MODE_CLOSED:
141 return
142 try:
143 if self._mode == _MODE_READ:
144 self._buffer.close()
145 self._buffer = None
146 elif self._mode == _MODE_WRITE:
147 self._fp.write(self._compressor.flush())
148 self._compressor = None
149 finally:
150 try:
151 if self._closefp:
152 self._fp.close()
153 finally:
154 self._fp = None
155 self._closefp = False
156 self._mode = _MODE_CLOSED
157
158 @property
159 def closed(self):
160 """True if this file is closed."""
161 return self._mode == _MODE_CLOSED
162
163 def fileno(self):
164 """Return the file descriptor for the underlying file."""
165 self._check_not_closed()
166 return self._fp.fileno()
167
168 def seekable(self):
169 """Return whether the file supports seeking."""
170 return self.readable() and self._buffer.seekable()
171
172 def readable(self):
173 """Return whether the file was opened for reading."""
174 self._check_not_closed()
175 return self._mode == _MODE_READ
176
177 def writable(self):
178 """Return whether the file was opened for writing."""
179 self._check_not_closed()
180 return self._mode == _MODE_WRITE
181
182 def peek(self, size=-1):
183 """Return buffered data without advancing the file position.
184
185 Always returns at least one byte of data, unless at EOF.
186 The exact number of bytes returned is unspecified.
187 """
188 self._check_can_read()
189 # Relies on the undocumented fact that BufferedReader.peek() always
190 # returns at least one byte (except at EOF)
191 return self._buffer.peek(size)
192
193 def read(self, size=-1):
194 """Read up to size uncompressed bytes from the file.
195
196 If size is negative or omitted, read until EOF is reached.
197 Returns b"" if the file is already at EOF.
198 """
199 self._check_can_read()
200 return self._buffer.read(size)
201
202 def read1(self, size=-1):
203 """Read up to size uncompressed bytes, while trying to avoid
204 making multiple reads from the underlying stream. Reads up to a
205 buffer's worth of data if size is negative.
206
207 Returns b"" if the file is at EOF.
208 """
209 self._check_can_read()
210 if size < 0:
211 size = io.DEFAULT_BUFFER_SIZE
212 return self._buffer.read1(size)
213
214 def readline(self, size=-1):
215 """Read a line of uncompressed bytes from the file.
216
217 The terminating newline (if present) is retained. If size is
218 non-negative, no more than size bytes will be read (in which
219 case the line may be incomplete). Returns b'' if already at EOF.
220 """
221 self._check_can_read()
222 return self._buffer.readline(size)
223
224 def write(self, data):
225 """Write a bytes object to the file.
226
227 Returns the number of uncompressed bytes written, which is
228 always the length of data in bytes. Note that due to buffering,
229 the file on disk may not reflect the data written until close()
230 is called.
231 """
232 self._check_can_write()
233 if isinstance(data, (bytes, bytearray)):
234 length = len(data)
235 else:
236 # accept any data that supports the buffer protocol
237 data = memoryview(data)
238 length = data.nbytes
239
240 compressed = self._compressor.compress(data)
241 self._fp.write(compressed)
242 self._pos += length
243 return length
244
245 def seek(self, offset, whence=io.SEEK_SET):
246 """Change the file position.
247
248 The new position is specified by offset, relative to the
249 position indicated by whence. Possible values for whence are:
250
251 0: start of stream (default): offset must not be negative
252 1: current stream position
253 2: end of stream; offset must not be positive
254
255 Returns the new file position.
256
257 Note that seeking is emulated, so depending on the parameters,
258 this operation may be extremely slow.
259 """
260 self._check_can_seek()
261 return self._buffer.seek(offset, whence)
262
263 def tell(self):
264 """Return the current file position."""
265 self._check_not_closed()
266 if self._mode == _MODE_READ:
267 return self._buffer.tell()
268 return self._pos
269
270
271 def open(filename, mode="rb", *,
272 format=None, check=-1, preset=None, filters=None,
273 encoding=None, errors=None, newline=None):
274 """Open an LZMA-compressed file in binary or text mode.
275
276 filename can be either an actual file name (given as a str, bytes,
277 or PathLike object), in which case the named file is opened, or it
278 can be an existing file object to read from or write to.
279
280 The mode argument can be "r", "rb" (default), "w", "wb", "x", "xb",
281 "a", or "ab" for binary mode, or "rt", "wt", "xt", or "at" for text
282 mode.
283
284 The format, check, preset and filters arguments specify the
285 compression settings, as for LZMACompressor, LZMADecompressor and
286 LZMAFile.
287
288 For binary mode, this function is equivalent to the LZMAFile
289 constructor: LZMAFile(filename, mode, ...). In this case, the
290 encoding, errors and newline arguments must not be provided.
291
292 For text mode, an LZMAFile object is created, and wrapped in an
293 io.TextIOWrapper instance with the specified encoding, error
294 handling behavior, and line ending(s).
295
296 """
297 if "t" in mode:
298 if "b" in mode:
299 raise ValueError("Invalid mode: %r" % (mode,))
300 else:
301 if encoding is not None:
302 raise ValueError("Argument 'encoding' not supported in binary mode")
303 if errors is not None:
304 raise ValueError("Argument 'errors' not supported in binary mode")
305 if newline is not None:
306 raise ValueError("Argument 'newline' not supported in binary mode")
307
308 lz_mode = mode.replace("t", "")
309 binary_file = LZMAFile(filename, lz_mode, format=format, check=check,
310 preset=preset, filters=filters)
311
312 if "t" in mode:
313 encoding = io.text_encoding(encoding)
314 return io.TextIOWrapper(binary_file, encoding, errors, newline)
315 else:
316 return binary_file
317
318
319 def compress(data, format=FORMAT_XZ, check=-1, preset=None, filters=None):
320 """Compress a block of data.
321
322 Refer to LZMACompressor's docstring for a description of the
323 optional arguments *format*, *check*, *preset* and *filters*.
324
325 For incremental compression, use an LZMACompressor instead.
326 """
327 comp = LZMACompressor(format, check, preset, filters)
328 return comp.compress(data) + comp.flush()
329
330
331 def decompress(data, format=FORMAT_AUTO, memlimit=None, filters=None):
332 """Decompress a block of data.
333
334 Refer to LZMADecompressor's docstring for a description of the
335 optional arguments *format*, *check* and *filters*.
336
337 For incremental decompression, use an LZMADecompressor instead.
338 """
339 results = []
340 while True:
341 decomp = LZMADecompressor(format, memlimit, filters)
342 try:
343 res = decomp.decompress(data)
344 except LZMAError:
345 if results:
346 break # Leftover data is not a valid LZMA/XZ stream; ignore it.
347 else:
348 raise # Error on the first iteration; bail out.
349 results.append(res)
350 if not decomp.eof:
351 raise LZMAError("Compressed data ended before the "
352 "end-of-stream marker was reached")
353 data = decomp.unused_data
354 if not data:
355 break
356 return b"".join(results)