(root)/
Python-3.11.7/
Lib/
bz2.py
       1  """Interface to the libbzip2 compression library.
       2  
       3  This module provides a file interface, classes for incremental
       4  (de)compression, and functions for one-shot (de)compression.
       5  """
       6  
       7  __all__ = ["BZ2File", "BZ2Compressor", "BZ2Decompressor",
       8             "open", "compress", "decompress"]
       9  
      10  __author__ = "Nadeem Vawda <nadeem.vawda@gmail.com>"
      11  
      12  from builtins import open as _builtin_open
      13  import io
      14  import os
      15  import _compression
      16  
      17  from _bz2 import BZ2Compressor, BZ2Decompressor
      18  
      19  
      20  _MODE_CLOSED   = 0
      21  _MODE_READ     = 1
      22  # Value 2 no longer used
      23  _MODE_WRITE    = 3
      24  
      25  
      26  class ESC[4;38;5;81mBZ2File(ESC[4;38;5;149m_compressionESC[4;38;5;149m.ESC[4;38;5;149mBaseStream):
      27  
      28      """A file object providing transparent bzip2 (de)compression.
      29  
      30      A BZ2File can act as a wrapper for an existing file object, or refer
      31      directly to a named file on disk.
      32  
      33      Note that BZ2File provides a *binary* file interface - data read is
      34      returned as bytes, and data to be written should be given as bytes.
      35      """
      36  
      37      def __init__(self, filename, mode="r", *, compresslevel=9):
      38          """Open a bzip2-compressed file.
      39  
      40          If filename is a str, bytes, or PathLike object, it gives the
      41          name of the file to be opened. Otherwise, it should be a file
      42          object, which will be used to read or write the compressed data.
      43  
      44          mode can be 'r' for reading (default), 'w' for (over)writing,
      45          'x' for creating exclusively, or 'a' for appending. These can
      46          equivalently be given as 'rb', 'wb', 'xb', and 'ab'.
      47  
      48          If mode is 'w', 'x' or 'a', compresslevel can be a number between 1
      49          and 9 specifying the level of compression: 1 produces the least
      50          compression, and 9 (default) produces the most compression.
      51  
      52          If mode is 'r', the input file may be the concatenation of
      53          multiple compressed streams.
      54          """
      55          self._fp = None
      56          self._closefp = False
      57          self._mode = _MODE_CLOSED
      58  
      59          if not (1 <= compresslevel <= 9):
      60              raise ValueError("compresslevel must be between 1 and 9")
      61  
      62          if mode in ("", "r", "rb"):
      63              mode = "rb"
      64              mode_code = _MODE_READ
      65          elif mode in ("w", "wb"):
      66              mode = "wb"
      67              mode_code = _MODE_WRITE
      68              self._compressor = BZ2Compressor(compresslevel)
      69          elif mode in ("x", "xb"):
      70              mode = "xb"
      71              mode_code = _MODE_WRITE
      72              self._compressor = BZ2Compressor(compresslevel)
      73          elif mode in ("a", "ab"):
      74              mode = "ab"
      75              mode_code = _MODE_WRITE
      76              self._compressor = BZ2Compressor(compresslevel)
      77          else:
      78              raise ValueError("Invalid mode: %r" % (mode,))
      79  
      80          if isinstance(filename, (str, bytes, os.PathLike)):
      81              self._fp = _builtin_open(filename, mode)
      82              self._closefp = True
      83              self._mode = mode_code
      84          elif hasattr(filename, "read") or hasattr(filename, "write"):
      85              self._fp = filename
      86              self._mode = mode_code
      87          else:
      88              raise TypeError("filename must be a str, bytes, file or PathLike object")
      89  
      90          if self._mode == _MODE_READ:
      91              raw = _compression.DecompressReader(self._fp,
      92                  BZ2Decompressor, trailing_error=OSError)
      93              self._buffer = io.BufferedReader(raw)
      94          else:
      95              self._pos = 0
      96  
      97      def close(self):
      98          """Flush and close the file.
      99  
     100          May be called more than once without error. Once the file is
     101          closed, any other operation on it will raise a ValueError.
     102          """
     103          if self._mode == _MODE_CLOSED:
     104              return
     105          try:
     106              if self._mode == _MODE_READ:
     107                  self._buffer.close()
     108              elif self._mode == _MODE_WRITE:
     109                  self._fp.write(self._compressor.flush())
     110                  self._compressor = None
     111          finally:
     112              try:
     113                  if self._closefp:
     114                      self._fp.close()
     115              finally:
     116                  self._fp = None
     117                  self._closefp = False
     118                  self._mode = _MODE_CLOSED
     119                  self._buffer = None
     120  
     121      @property
     122      def closed(self):
     123          """True if this file is closed."""
     124          return self._mode == _MODE_CLOSED
     125  
     126      def fileno(self):
     127          """Return the file descriptor for the underlying file."""
     128          self._check_not_closed()
     129          return self._fp.fileno()
     130  
     131      def seekable(self):
     132          """Return whether the file supports seeking."""
     133          return self.readable() and self._buffer.seekable()
     134  
     135      def readable(self):
     136          """Return whether the file was opened for reading."""
     137          self._check_not_closed()
     138          return self._mode == _MODE_READ
     139  
     140      def writable(self):
     141          """Return whether the file was opened for writing."""
     142          self._check_not_closed()
     143          return self._mode == _MODE_WRITE
     144  
     145      def peek(self, n=0):
     146          """Return buffered data without advancing the file position.
     147  
     148          Always returns at least one byte of data, unless at EOF.
     149          The exact number of bytes returned is unspecified.
     150          """
     151          self._check_can_read()
     152          # Relies on the undocumented fact that BufferedReader.peek()
     153          # always returns at least one byte (except at EOF), independent
     154          # of the value of n
     155          return self._buffer.peek(n)
     156  
     157      def read(self, size=-1):
     158          """Read up to size uncompressed bytes from the file.
     159  
     160          If size is negative or omitted, read until EOF is reached.
     161          Returns b'' if the file is already at EOF.
     162          """
     163          self._check_can_read()
     164          return self._buffer.read(size)
     165  
     166      def read1(self, size=-1):
     167          """Read up to size uncompressed bytes, while trying to avoid
     168          making multiple reads from the underlying stream. Reads up to a
     169          buffer's worth of data if size is negative.
     170  
     171          Returns b'' if the file is at EOF.
     172          """
     173          self._check_can_read()
     174          if size < 0:
     175              size = io.DEFAULT_BUFFER_SIZE
     176          return self._buffer.read1(size)
     177  
     178      def readinto(self, b):
     179          """Read bytes into b.
     180  
     181          Returns the number of bytes read (0 for EOF).
     182          """
     183          self._check_can_read()
     184          return self._buffer.readinto(b)
     185  
     186      def readline(self, size=-1):
     187          """Read a line of uncompressed bytes from the file.
     188  
     189          The terminating newline (if present) is retained. If size is
     190          non-negative, no more than size bytes will be read (in which
     191          case the line may be incomplete). Returns b'' if already at EOF.
     192          """
     193          if not isinstance(size, int):
     194              if not hasattr(size, "__index__"):
     195                  raise TypeError("Integer argument expected")
     196              size = size.__index__()
     197          self._check_can_read()
     198          return self._buffer.readline(size)
     199  
     200      def readlines(self, size=-1):
     201          """Read a list of lines of uncompressed bytes from the file.
     202  
     203          size can be specified to control the number of lines read: no
     204          further lines will be read once the total size of the lines read
     205          so far equals or exceeds size.
     206          """
     207          if not isinstance(size, int):
     208              if not hasattr(size, "__index__"):
     209                  raise TypeError("Integer argument expected")
     210              size = size.__index__()
     211          self._check_can_read()
     212          return self._buffer.readlines(size)
     213  
     214      def write(self, data):
     215          """Write a byte string to the file.
     216  
     217          Returns the number of uncompressed bytes written, which is
     218          always the length of data in bytes. Note that due to buffering,
     219          the file on disk may not reflect the data written until close()
     220          is called.
     221          """
     222          self._check_can_write()
     223          if isinstance(data, (bytes, bytearray)):
     224              length = len(data)
     225          else:
     226              # accept any data that supports the buffer protocol
     227              data = memoryview(data)
     228              length = data.nbytes
     229  
     230          compressed = self._compressor.compress(data)
     231          self._fp.write(compressed)
     232          self._pos += length
     233          return length
     234  
     235      def writelines(self, seq):
     236          """Write a sequence of byte strings to the file.
     237  
     238          Returns the number of uncompressed bytes written.
     239          seq can be any iterable yielding byte strings.
     240  
     241          Line separators are not added between the written byte strings.
     242          """
     243          return _compression.BaseStream.writelines(self, seq)
     244  
     245      def seek(self, offset, whence=io.SEEK_SET):
     246          """Change the file position.
     247  
     248          The new position is specified by offset, relative to the
     249          position indicated by whence. Values for whence are:
     250  
     251              0: start of stream (default); offset must not be negative
     252              1: current stream position
     253              2: end of stream; offset must not be positive
     254  
     255          Returns the new file position.
     256  
     257          Note that seeking is emulated, so depending on the parameters,
     258          this operation may be extremely slow.
     259          """
     260          self._check_can_seek()
     261          return self._buffer.seek(offset, whence)
     262  
     263      def tell(self):
     264          """Return the current file position."""
     265          self._check_not_closed()
     266          if self._mode == _MODE_READ:
     267              return self._buffer.tell()
     268          return self._pos
     269  
     270  
     271  def open(filename, mode="rb", compresslevel=9,
     272           encoding=None, errors=None, newline=None):
     273      """Open a bzip2-compressed file in binary or text mode.
     274  
     275      The filename argument can be an actual filename (a str, bytes, or
     276      PathLike object), or an existing file object to read from or write
     277      to.
     278  
     279      The mode argument can be "r", "rb", "w", "wb", "x", "xb", "a" or
     280      "ab" for binary mode, or "rt", "wt", "xt" or "at" for text mode.
     281      The default mode is "rb", and the default compresslevel is 9.
     282  
     283      For binary mode, this function is equivalent to the BZ2File
     284      constructor: BZ2File(filename, mode, compresslevel). In this case,
     285      the encoding, errors and newline arguments must not be provided.
     286  
     287      For text mode, a BZ2File object is created, and wrapped in an
     288      io.TextIOWrapper instance with the specified encoding, error
     289      handling behavior, and line ending(s).
     290  
     291      """
     292      if "t" in mode:
     293          if "b" in mode:
     294              raise ValueError("Invalid mode: %r" % (mode,))
     295      else:
     296          if encoding is not None:
     297              raise ValueError("Argument 'encoding' not supported in binary mode")
     298          if errors is not None:
     299              raise ValueError("Argument 'errors' not supported in binary mode")
     300          if newline is not None:
     301              raise ValueError("Argument 'newline' not supported in binary mode")
     302  
     303      bz_mode = mode.replace("t", "")
     304      binary_file = BZ2File(filename, bz_mode, compresslevel=compresslevel)
     305  
     306      if "t" in mode:
     307          encoding = io.text_encoding(encoding)
     308          return io.TextIOWrapper(binary_file, encoding, errors, newline)
     309      else:
     310          return binary_file
     311  
     312  
     313  def compress(data, compresslevel=9):
     314      """Compress a block of data.
     315  
     316      compresslevel, if given, must be a number between 1 and 9.
     317  
     318      For incremental compression, use a BZ2Compressor object instead.
     319      """
     320      comp = BZ2Compressor(compresslevel)
     321      return comp.compress(data) + comp.flush()
     322  
     323  
     324  def decompress(data):
     325      """Decompress a block of data.
     326  
     327      For incremental decompression, use a BZ2Decompressor object instead.
     328      """
     329      results = []
     330      while data:
     331          decomp = BZ2Decompressor()
     332          try:
     333              res = decomp.decompress(data)
     334          except OSError:
     335              if results:
     336                  break  # Leftover data is not a valid bzip2 stream; ignore it.
     337              else:
     338                  raise  # Error on the first iteration; bail out.
     339          results.append(res)
     340          if not decomp.eof:
     341              raise ValueError("Compressed data ended before the "
     342                               "end-of-stream marker was reached")
     343          data = decomp.unused_data
     344      return b"".join(results)