(root)/
Python-3.11.7/
Lib/
lzma.py
       1  """Interface to the liblzma compression library.
       2  
       3  This module provides a class for reading and writing compressed files,
       4  classes for incremental (de)compression, and convenience functions for
       5  one-shot (de)compression.
       6  
       7  These classes and functions support both the XZ and legacy LZMA
       8  container formats, as well as raw compressed data streams.
       9  """
      10  
      11  __all__ = [
      12      "CHECK_NONE", "CHECK_CRC32", "CHECK_CRC64", "CHECK_SHA256",
      13      "CHECK_ID_MAX", "CHECK_UNKNOWN",
      14      "FILTER_LZMA1", "FILTER_LZMA2", "FILTER_DELTA", "FILTER_X86", "FILTER_IA64",
      15      "FILTER_ARM", "FILTER_ARMTHUMB", "FILTER_POWERPC", "FILTER_SPARC",
      16      "FORMAT_AUTO", "FORMAT_XZ", "FORMAT_ALONE", "FORMAT_RAW",
      17      "MF_HC3", "MF_HC4", "MF_BT2", "MF_BT3", "MF_BT4",
      18      "MODE_FAST", "MODE_NORMAL", "PRESET_DEFAULT", "PRESET_EXTREME",
      19  
      20      "LZMACompressor", "LZMADecompressor", "LZMAFile", "LZMAError",
      21      "open", "compress", "decompress", "is_check_supported",
      22  ]
      23  
      24  import builtins
      25  import io
      26  import os
      27  from _lzma import *
      28  from _lzma import _encode_filter_properties, _decode_filter_properties
      29  import _compression
      30  
      31  
      32  _MODE_CLOSED   = 0
      33  _MODE_READ     = 1
      34  # Value 2 no longer used
      35  _MODE_WRITE    = 3
      36  
      37  
      38  class ESC[4;38;5;81mLZMAFile(ESC[4;38;5;149m_compressionESC[4;38;5;149m.ESC[4;38;5;149mBaseStream):
      39  
      40      """A file object providing transparent LZMA (de)compression.
      41  
      42      An LZMAFile can act as a wrapper for an existing file object, or
      43      refer directly to a named file on disk.
      44  
      45      Note that LZMAFile provides a *binary* file interface - data read
      46      is returned as bytes, and data to be written must be given as bytes.
      47      """
      48  
      49      def __init__(self, filename=None, mode="r", *,
      50                   format=None, check=-1, preset=None, filters=None):
      51          """Open an LZMA-compressed file in binary mode.
      52  
      53          filename can be either an actual file name (given as a str,
      54          bytes, or PathLike object), in which case the named file is
      55          opened, or it can be an existing file object to read from or
      56          write to.
      57  
      58          mode can be "r" for reading (default), "w" for (over)writing,
      59          "x" for creating exclusively, or "a" for appending. These can
      60          equivalently be given as "rb", "wb", "xb" and "ab" respectively.
      61  
      62          format specifies the container format to use for the file.
      63          If mode is "r", this defaults to FORMAT_AUTO. Otherwise, the
      64          default is FORMAT_XZ.
      65  
      66          check specifies the integrity check to use. This argument can
      67          only be used when opening a file for writing. For FORMAT_XZ,
      68          the default is CHECK_CRC64. FORMAT_ALONE and FORMAT_RAW do not
      69          support integrity checks - for these formats, check must be
      70          omitted, or be CHECK_NONE.
      71  
      72          When opening a file for reading, the *preset* argument is not
      73          meaningful, and should be omitted. The *filters* argument should
      74          also be omitted, except when format is FORMAT_RAW (in which case
      75          it is required).
      76  
      77          When opening a file for writing, the settings used by the
      78          compressor can be specified either as a preset compression
      79          level (with the *preset* argument), or in detail as a custom
      80          filter chain (with the *filters* argument). For FORMAT_XZ and
      81          FORMAT_ALONE, the default is to use the PRESET_DEFAULT preset
      82          level. For FORMAT_RAW, the caller must always specify a filter
      83          chain; the raw compressor does not support preset compression
      84          levels.
      85  
      86          preset (if provided) should be an integer in the range 0-9,
      87          optionally OR-ed with the constant PRESET_EXTREME.
      88  
      89          filters (if provided) should be a sequence of dicts. Each dict
      90          should have an entry for "id" indicating ID of the filter, plus
      91          additional entries for options to the filter.
      92          """
      93          self._fp = None
      94          self._closefp = False
      95          self._mode = _MODE_CLOSED
      96  
      97          if mode in ("r", "rb"):
      98              if check != -1:
      99                  raise ValueError("Cannot specify an integrity check "
     100                                   "when opening a file for reading")
     101              if preset is not None:
     102                  raise ValueError("Cannot specify a preset compression "
     103                                   "level when opening a file for reading")
     104              if format is None:
     105                  format = FORMAT_AUTO
     106              mode_code = _MODE_READ
     107          elif mode in ("w", "wb", "a", "ab", "x", "xb"):
     108              if format is None:
     109                  format = FORMAT_XZ
     110              mode_code = _MODE_WRITE
     111              self._compressor = LZMACompressor(format=format, check=check,
     112                                                preset=preset, filters=filters)
     113              self._pos = 0
     114          else:
     115              raise ValueError("Invalid mode: {!r}".format(mode))
     116  
     117          if isinstance(filename, (str, bytes, os.PathLike)):
     118              if "b" not in mode:
     119                  mode += "b"
     120              self._fp = builtins.open(filename, mode)
     121              self._closefp = True
     122              self._mode = mode_code
     123          elif hasattr(filename, "read") or hasattr(filename, "write"):
     124              self._fp = filename
     125              self._mode = mode_code
     126          else:
     127              raise TypeError("filename must be a str, bytes, file or PathLike object")
     128  
     129          if self._mode == _MODE_READ:
     130              raw = _compression.DecompressReader(self._fp, LZMADecompressor,
     131                  trailing_error=LZMAError, format=format, filters=filters)
     132              self._buffer = io.BufferedReader(raw)
     133  
     134      def close(self):
     135          """Flush and close the file.
     136  
     137          May be called more than once without error. Once the file is
     138          closed, any other operation on it will raise a ValueError.
     139          """
     140          if self._mode == _MODE_CLOSED:
     141              return
     142          try:
     143              if self._mode == _MODE_READ:
     144                  self._buffer.close()
     145                  self._buffer = None
     146              elif self._mode == _MODE_WRITE:
     147                  self._fp.write(self._compressor.flush())
     148                  self._compressor = None
     149          finally:
     150              try:
     151                  if self._closefp:
     152                      self._fp.close()
     153              finally:
     154                  self._fp = None
     155                  self._closefp = False
     156                  self._mode = _MODE_CLOSED
     157  
     158      @property
     159      def closed(self):
     160          """True if this file is closed."""
     161          return self._mode == _MODE_CLOSED
     162  
     163      def fileno(self):
     164          """Return the file descriptor for the underlying file."""
     165          self._check_not_closed()
     166          return self._fp.fileno()
     167  
     168      def seekable(self):
     169          """Return whether the file supports seeking."""
     170          return self.readable() and self._buffer.seekable()
     171  
     172      def readable(self):
     173          """Return whether the file was opened for reading."""
     174          self._check_not_closed()
     175          return self._mode == _MODE_READ
     176  
     177      def writable(self):
     178          """Return whether the file was opened for writing."""
     179          self._check_not_closed()
     180          return self._mode == _MODE_WRITE
     181  
     182      def peek(self, size=-1):
     183          """Return buffered data without advancing the file position.
     184  
     185          Always returns at least one byte of data, unless at EOF.
     186          The exact number of bytes returned is unspecified.
     187          """
     188          self._check_can_read()
     189          # Relies on the undocumented fact that BufferedReader.peek() always
     190          # returns at least one byte (except at EOF)
     191          return self._buffer.peek(size)
     192  
     193      def read(self, size=-1):
     194          """Read up to size uncompressed bytes from the file.
     195  
     196          If size is negative or omitted, read until EOF is reached.
     197          Returns b"" if the file is already at EOF.
     198          """
     199          self._check_can_read()
     200          return self._buffer.read(size)
     201  
     202      def read1(self, size=-1):
     203          """Read up to size uncompressed bytes, while trying to avoid
     204          making multiple reads from the underlying stream. Reads up to a
     205          buffer's worth of data if size is negative.
     206  
     207          Returns b"" if the file is at EOF.
     208          """
     209          self._check_can_read()
     210          if size < 0:
     211              size = io.DEFAULT_BUFFER_SIZE
     212          return self._buffer.read1(size)
     213  
     214      def readline(self, size=-1):
     215          """Read a line of uncompressed bytes from the file.
     216  
     217          The terminating newline (if present) is retained. If size is
     218          non-negative, no more than size bytes will be read (in which
     219          case the line may be incomplete). Returns b'' if already at EOF.
     220          """
     221          self._check_can_read()
     222          return self._buffer.readline(size)
     223  
     224      def write(self, data):
     225          """Write a bytes object to the file.
     226  
     227          Returns the number of uncompressed bytes written, which is
     228          always the length of data in bytes. Note that due to buffering,
     229          the file on disk may not reflect the data written until close()
     230          is called.
     231          """
     232          self._check_can_write()
     233          if isinstance(data, (bytes, bytearray)):
     234              length = len(data)
     235          else:
     236              # accept any data that supports the buffer protocol
     237              data = memoryview(data)
     238              length = data.nbytes
     239  
     240          compressed = self._compressor.compress(data)
     241          self._fp.write(compressed)
     242          self._pos += length
     243          return length
     244  
     245      def seek(self, offset, whence=io.SEEK_SET):
     246          """Change the file position.
     247  
     248          The new position is specified by offset, relative to the
     249          position indicated by whence. Possible values for whence are:
     250  
     251              0: start of stream (default): offset must not be negative
     252              1: current stream position
     253              2: end of stream; offset must not be positive
     254  
     255          Returns the new file position.
     256  
     257          Note that seeking is emulated, so depending on the parameters,
     258          this operation may be extremely slow.
     259          """
     260          self._check_can_seek()
     261          return self._buffer.seek(offset, whence)
     262  
     263      def tell(self):
     264          """Return the current file position."""
     265          self._check_not_closed()
     266          if self._mode == _MODE_READ:
     267              return self._buffer.tell()
     268          return self._pos
     269  
     270  
     271  def open(filename, mode="rb", *,
     272           format=None, check=-1, preset=None, filters=None,
     273           encoding=None, errors=None, newline=None):
     274      """Open an LZMA-compressed file in binary or text mode.
     275  
     276      filename can be either an actual file name (given as a str, bytes,
     277      or PathLike object), in which case the named file is opened, or it
     278      can be an existing file object to read from or write to.
     279  
     280      The mode argument can be "r", "rb" (default), "w", "wb", "x", "xb",
     281      "a", or "ab" for binary mode, or "rt", "wt", "xt", or "at" for text
     282      mode.
     283  
     284      The format, check, preset and filters arguments specify the
     285      compression settings, as for LZMACompressor, LZMADecompressor and
     286      LZMAFile.
     287  
     288      For binary mode, this function is equivalent to the LZMAFile
     289      constructor: LZMAFile(filename, mode, ...). In this case, the
     290      encoding, errors and newline arguments must not be provided.
     291  
     292      For text mode, an LZMAFile object is created, and wrapped in an
     293      io.TextIOWrapper instance with the specified encoding, error
     294      handling behavior, and line ending(s).
     295  
     296      """
     297      if "t" in mode:
     298          if "b" in mode:
     299              raise ValueError("Invalid mode: %r" % (mode,))
     300      else:
     301          if encoding is not None:
     302              raise ValueError("Argument 'encoding' not supported in binary mode")
     303          if errors is not None:
     304              raise ValueError("Argument 'errors' not supported in binary mode")
     305          if newline is not None:
     306              raise ValueError("Argument 'newline' not supported in binary mode")
     307  
     308      lz_mode = mode.replace("t", "")
     309      binary_file = LZMAFile(filename, lz_mode, format=format, check=check,
     310                             preset=preset, filters=filters)
     311  
     312      if "t" in mode:
     313          encoding = io.text_encoding(encoding)
     314          return io.TextIOWrapper(binary_file, encoding, errors, newline)
     315      else:
     316          return binary_file
     317  
     318  
     319  def compress(data, format=FORMAT_XZ, check=-1, preset=None, filters=None):
     320      """Compress a block of data.
     321  
     322      Refer to LZMACompressor's docstring for a description of the
     323      optional arguments *format*, *check*, *preset* and *filters*.
     324  
     325      For incremental compression, use an LZMACompressor instead.
     326      """
     327      comp = LZMACompressor(format, check, preset, filters)
     328      return comp.compress(data) + comp.flush()
     329  
     330  
     331  def decompress(data, format=FORMAT_AUTO, memlimit=None, filters=None):
     332      """Decompress a block of data.
     333  
     334      Refer to LZMADecompressor's docstring for a description of the
     335      optional arguments *format*, *check* and *filters*.
     336  
     337      For incremental decompression, use an LZMADecompressor instead.
     338      """
     339      results = []
     340      while True:
     341          decomp = LZMADecompressor(format, memlimit, filters)
     342          try:
     343              res = decomp.decompress(data)
     344          except LZMAError:
     345              if results:
     346                  break  # Leftover data is not a valid LZMA/XZ stream; ignore it.
     347              else:
     348                  raise  # Error on the first iteration; bail out.
     349          results.append(res)
     350          if not decomp.eof:
     351              raise LZMAError("Compressed data ended before the "
     352                              "end-of-stream marker was reached")
     353          data = decomp.unused_data
     354          if not data:
     355              break
     356      return b"".join(results)