(root)/
Python-3.12.0/
Lib/
wave.py
       1  """Stuff to parse WAVE files.
       2  
       3  Usage.
       4  
       5  Reading WAVE files:
       6        f = wave.open(file, 'r')
       7  where file is either the name of a file or an open file pointer.
       8  The open file pointer must have methods read(), seek(), and close().
       9  When the setpos() and rewind() methods are not used, the seek()
      10  method is not  necessary.
      11  
      12  This returns an instance of a class with the following public methods:
      13        getnchannels()  -- returns number of audio channels (1 for
      14                           mono, 2 for stereo)
      15        getsampwidth()  -- returns sample width in bytes
      16        getframerate()  -- returns sampling frequency
      17        getnframes()    -- returns number of audio frames
      18        getcomptype()   -- returns compression type ('NONE' for linear samples)
      19        getcompname()   -- returns human-readable version of
      20                           compression type ('not compressed' linear samples)
      21        getparams()     -- returns a namedtuple consisting of all of the
      22                           above in the above order
      23        getmarkers()    -- returns None (for compatibility with the
      24                           aifc module)
      25        getmark(id)     -- raises an error since the mark does not
      26                           exist (for compatibility with the aifc module)
      27        readframes(n)   -- returns at most n frames of audio
      28        rewind()        -- rewind to the beginning of the audio stream
      29        setpos(pos)     -- seek to the specified position
      30        tell()          -- return the current position
      31        close()         -- close the instance (make it unusable)
      32  The position returned by tell() and the position given to setpos()
      33  are compatible and have nothing to do with the actual position in the
      34  file.
      35  The close() method is called automatically when the class instance
      36  is destroyed.
      37  
      38  Writing WAVE files:
      39        f = wave.open(file, 'w')
      40  where file is either the name of a file or an open file pointer.
      41  The open file pointer must have methods write(), tell(), seek(), and
      42  close().
      43  
      44  This returns an instance of a class with the following public methods:
      45        setnchannels(n) -- set the number of channels
      46        setsampwidth(n) -- set the sample width
      47        setframerate(n) -- set the frame rate
      48        setnframes(n)   -- set the number of frames
      49        setcomptype(type, name)
      50                        -- set the compression type and the
      51                           human-readable compression type
      52        setparams(tuple)
      53                        -- set all parameters at once
      54        tell()          -- return current position in output file
      55        writeframesraw(data)
      56                        -- write audio frames without patching up the
      57                           file header
      58        writeframes(data)
      59                        -- write audio frames and patch up the file header
      60        close()         -- patch up the file header and close the
      61                           output file
      62  You should set the parameters before the first writeframesraw or
      63  writeframes.  The total number of frames does not need to be set,
      64  but when it is set to the correct value, the header does not have to
      65  be patched up.
      66  It is best to first set all parameters, perhaps possibly the
      67  compression type, and then write audio frames using writeframesraw.
      68  When all frames have been written, either call writeframes(b'') or
      69  close() to patch up the sizes in the header.
      70  The close() method is called automatically when the class instance
      71  is destroyed.
      72  """
      73  
      74  from collections import namedtuple
      75  import builtins
      76  import struct
      77  import sys
      78  
      79  
      80  __all__ = ["open", "Error", "Wave_read", "Wave_write"]
      81  
      82  class ESC[4;38;5;81mError(ESC[4;38;5;149mException):
      83      pass
      84  
      85  WAVE_FORMAT_PCM = 0x0001
      86  WAVE_FORMAT_EXTENSIBLE = 0xFFFE
      87  # Derived from uuid.UUID("00000001-0000-0010-8000-00aa00389b71").bytes_le
      88  KSDATAFORMAT_SUBTYPE_PCM = b'\x01\x00\x00\x00\x00\x00\x10\x00\x80\x00\x00\xaa\x008\x9bq'
      89  
      90  _array_fmts = None, 'b', 'h', None, 'i'
      91  
      92  _wave_params = namedtuple('_wave_params',
      93                       'nchannels sampwidth framerate nframes comptype compname')
      94  
      95  
      96  def _byteswap(data, width):
      97      swapped_data = bytearray(len(data))
      98  
      99      for i in range(0, len(data), width):
     100          for j in range(width):
     101              swapped_data[i + width - 1 - j] = data[i + j]
     102  
     103      return bytes(swapped_data)
     104  
     105  
     106  class ESC[4;38;5;81m_Chunk:
     107      def __init__(self, file, align=True, bigendian=True, inclheader=False):
     108          self.closed = False
     109          self.align = align      # whether to align to word (2-byte) boundaries
     110          if bigendian:
     111              strflag = '>'
     112          else:
     113              strflag = '<'
     114          self.file = file
     115          self.chunkname = file.read(4)
     116          if len(self.chunkname) < 4:
     117              raise EOFError
     118          try:
     119              self.chunksize = struct.unpack_from(strflag+'L', file.read(4))[0]
     120          except struct.error:
     121              raise EOFError from None
     122          if inclheader:
     123              self.chunksize = self.chunksize - 8 # subtract header
     124          self.size_read = 0
     125          try:
     126              self.offset = self.file.tell()
     127          except (AttributeError, OSError):
     128              self.seekable = False
     129          else:
     130              self.seekable = True
     131  
     132      def getname(self):
     133          """Return the name (ID) of the current chunk."""
     134          return self.chunkname
     135  
     136      def close(self):
     137          if not self.closed:
     138              try:
     139                  self.skip()
     140              finally:
     141                  self.closed = True
     142  
     143      def seek(self, pos, whence=0):
     144          """Seek to specified position into the chunk.
     145          Default position is 0 (start of chunk).
     146          If the file is not seekable, this will result in an error.
     147          """
     148  
     149          if self.closed:
     150              raise ValueError("I/O operation on closed file")
     151          if not self.seekable:
     152              raise OSError("cannot seek")
     153          if whence == 1:
     154              pos = pos + self.size_read
     155          elif whence == 2:
     156              pos = pos + self.chunksize
     157          if pos < 0 or pos > self.chunksize:
     158              raise RuntimeError
     159          self.file.seek(self.offset + pos, 0)
     160          self.size_read = pos
     161  
     162      def tell(self):
     163          if self.closed:
     164              raise ValueError("I/O operation on closed file")
     165          return self.size_read
     166  
     167      def read(self, size=-1):
     168          """Read at most size bytes from the chunk.
     169          If size is omitted or negative, read until the end
     170          of the chunk.
     171          """
     172  
     173          if self.closed:
     174              raise ValueError("I/O operation on closed file")
     175          if self.size_read >= self.chunksize:
     176              return b''
     177          if size < 0:
     178              size = self.chunksize - self.size_read
     179          if size > self.chunksize - self.size_read:
     180              size = self.chunksize - self.size_read
     181          data = self.file.read(size)
     182          self.size_read = self.size_read + len(data)
     183          if self.size_read == self.chunksize and \
     184             self.align and \
     185             (self.chunksize & 1):
     186              dummy = self.file.read(1)
     187              self.size_read = self.size_read + len(dummy)
     188          return data
     189  
     190      def skip(self):
     191          """Skip the rest of the chunk.
     192          If you are not interested in the contents of the chunk,
     193          this method should be called so that the file points to
     194          the start of the next chunk.
     195          """
     196  
     197          if self.closed:
     198              raise ValueError("I/O operation on closed file")
     199          if self.seekable:
     200              try:
     201                  n = self.chunksize - self.size_read
     202                  # maybe fix alignment
     203                  if self.align and (self.chunksize & 1):
     204                      n = n + 1
     205                  self.file.seek(n, 1)
     206                  self.size_read = self.size_read + n
     207                  return
     208              except OSError:
     209                  pass
     210          while self.size_read < self.chunksize:
     211              n = min(8192, self.chunksize - self.size_read)
     212              dummy = self.read(n)
     213              if not dummy:
     214                  raise EOFError
     215  
     216  
     217  class ESC[4;38;5;81mWave_read:
     218      """Variables used in this class:
     219  
     220      These variables are available to the user though appropriate
     221      methods of this class:
     222      _file -- the open file with methods read(), close(), and seek()
     223                set through the __init__() method
     224      _nchannels -- the number of audio channels
     225                available through the getnchannels() method
     226      _nframes -- the number of audio frames
     227                available through the getnframes() method
     228      _sampwidth -- the number of bytes per audio sample
     229                available through the getsampwidth() method
     230      _framerate -- the sampling frequency
     231                available through the getframerate() method
     232      _comptype -- the AIFF-C compression type ('NONE' if AIFF)
     233                available through the getcomptype() method
     234      _compname -- the human-readable AIFF-C compression type
     235                available through the getcomptype() method
     236      _soundpos -- the position in the audio stream
     237                available through the tell() method, set through the
     238                setpos() method
     239  
     240      These variables are used internally only:
     241      _fmt_chunk_read -- 1 iff the FMT chunk has been read
     242      _data_seek_needed -- 1 iff positioned correctly in audio
     243                file for readframes()
     244      _data_chunk -- instantiation of a chunk class for the DATA chunk
     245      _framesize -- size of one frame in the file
     246      """
     247  
     248      def initfp(self, file):
     249          self._convert = None
     250          self._soundpos = 0
     251          self._file = _Chunk(file, bigendian = 0)
     252          if self._file.getname() != b'RIFF':
     253              raise Error('file does not start with RIFF id')
     254          if self._file.read(4) != b'WAVE':
     255              raise Error('not a WAVE file')
     256          self._fmt_chunk_read = 0
     257          self._data_chunk = None
     258          while 1:
     259              self._data_seek_needed = 1
     260              try:
     261                  chunk = _Chunk(self._file, bigendian = 0)
     262              except EOFError:
     263                  break
     264              chunkname = chunk.getname()
     265              if chunkname == b'fmt ':
     266                  self._read_fmt_chunk(chunk)
     267                  self._fmt_chunk_read = 1
     268              elif chunkname == b'data':
     269                  if not self._fmt_chunk_read:
     270                      raise Error('data chunk before fmt chunk')
     271                  self._data_chunk = chunk
     272                  self._nframes = chunk.chunksize // self._framesize
     273                  self._data_seek_needed = 0
     274                  break
     275              chunk.skip()
     276          if not self._fmt_chunk_read or not self._data_chunk:
     277              raise Error('fmt chunk and/or data chunk missing')
     278  
     279      def __init__(self, f):
     280          self._i_opened_the_file = None
     281          if isinstance(f, str):
     282              f = builtins.open(f, 'rb')
     283              self._i_opened_the_file = f
     284          # else, assume it is an open file object already
     285          try:
     286              self.initfp(f)
     287          except:
     288              if self._i_opened_the_file:
     289                  f.close()
     290              raise
     291  
     292      def __del__(self):
     293          self.close()
     294  
     295      def __enter__(self):
     296          return self
     297  
     298      def __exit__(self, *args):
     299          self.close()
     300  
     301      #
     302      # User visible methods.
     303      #
     304      def getfp(self):
     305          return self._file
     306  
     307      def rewind(self):
     308          self._data_seek_needed = 1
     309          self._soundpos = 0
     310  
     311      def close(self):
     312          self._file = None
     313          file = self._i_opened_the_file
     314          if file:
     315              self._i_opened_the_file = None
     316              file.close()
     317  
     318      def tell(self):
     319          return self._soundpos
     320  
     321      def getnchannels(self):
     322          return self._nchannels
     323  
     324      def getnframes(self):
     325          return self._nframes
     326  
     327      def getsampwidth(self):
     328          return self._sampwidth
     329  
     330      def getframerate(self):
     331          return self._framerate
     332  
     333      def getcomptype(self):
     334          return self._comptype
     335  
     336      def getcompname(self):
     337          return self._compname
     338  
     339      def getparams(self):
     340          return _wave_params(self.getnchannels(), self.getsampwidth(),
     341                         self.getframerate(), self.getnframes(),
     342                         self.getcomptype(), self.getcompname())
     343  
     344      def getmarkers(self):
     345          return None
     346  
     347      def getmark(self, id):
     348          raise Error('no marks')
     349  
     350      def setpos(self, pos):
     351          if pos < 0 or pos > self._nframes:
     352              raise Error('position not in range')
     353          self._soundpos = pos
     354          self._data_seek_needed = 1
     355  
     356      def readframes(self, nframes):
     357          if self._data_seek_needed:
     358              self._data_chunk.seek(0, 0)
     359              pos = self._soundpos * self._framesize
     360              if pos:
     361                  self._data_chunk.seek(pos, 0)
     362              self._data_seek_needed = 0
     363          if nframes == 0:
     364              return b''
     365          data = self._data_chunk.read(nframes * self._framesize)
     366          if self._sampwidth != 1 and sys.byteorder == 'big':
     367              data = _byteswap(data, self._sampwidth)
     368          if self._convert and data:
     369              data = self._convert(data)
     370          self._soundpos = self._soundpos + len(data) // (self._nchannels * self._sampwidth)
     371          return data
     372  
     373      #
     374      # Internal methods.
     375      #
     376  
     377      def _read_fmt_chunk(self, chunk):
     378          try:
     379              wFormatTag, self._nchannels, self._framerate, dwAvgBytesPerSec, wBlockAlign = struct.unpack_from('<HHLLH', chunk.read(14))
     380          except struct.error:
     381              raise EOFError from None
     382          if wFormatTag != WAVE_FORMAT_PCM and wFormatTag != WAVE_FORMAT_EXTENSIBLE:
     383              raise Error('unknown format: %r' % (wFormatTag,))
     384          try:
     385              sampwidth = struct.unpack_from('<H', chunk.read(2))[0]
     386          except struct.error:
     387              raise EOFError from None
     388          if wFormatTag == WAVE_FORMAT_EXTENSIBLE:
     389              try:
     390                  cbSize, wValidBitsPerSample, dwChannelMask = struct.unpack_from('<HHL', chunk.read(8))
     391                  # Read the entire UUID from the chunk
     392                  SubFormat = chunk.read(16)
     393                  if len(SubFormat) < 16:
     394                      raise EOFError
     395              except struct.error:
     396                  raise EOFError from None
     397              if SubFormat != KSDATAFORMAT_SUBTYPE_PCM:
     398                  try:
     399                      import uuid
     400                      subformat_msg = f'unknown extended format: {uuid.UUID(bytes_le=SubFormat)}'
     401                  except Exception:
     402                      subformat_msg = 'unknown extended format'
     403                  raise Error(subformat_msg)
     404          self._sampwidth = (sampwidth + 7) // 8
     405          if not self._sampwidth:
     406              raise Error('bad sample width')
     407          if not self._nchannels:
     408              raise Error('bad # of channels')
     409          self._framesize = self._nchannels * self._sampwidth
     410          self._comptype = 'NONE'
     411          self._compname = 'not compressed'
     412  
     413  
     414  class ESC[4;38;5;81mWave_write:
     415      """Variables used in this class:
     416  
     417      These variables are user settable through appropriate methods
     418      of this class:
     419      _file -- the open file with methods write(), close(), tell(), seek()
     420                set through the __init__() method
     421      _comptype -- the AIFF-C compression type ('NONE' in AIFF)
     422                set through the setcomptype() or setparams() method
     423      _compname -- the human-readable AIFF-C compression type
     424                set through the setcomptype() or setparams() method
     425      _nchannels -- the number of audio channels
     426                set through the setnchannels() or setparams() method
     427      _sampwidth -- the number of bytes per audio sample
     428                set through the setsampwidth() or setparams() method
     429      _framerate -- the sampling frequency
     430                set through the setframerate() or setparams() method
     431      _nframes -- the number of audio frames written to the header
     432                set through the setnframes() or setparams() method
     433  
     434      These variables are used internally only:
     435      _datalength -- the size of the audio samples written to the header
     436      _nframeswritten -- the number of frames actually written
     437      _datawritten -- the size of the audio samples actually written
     438      """
     439  
     440      def __init__(self, f):
     441          self._i_opened_the_file = None
     442          if isinstance(f, str):
     443              f = builtins.open(f, 'wb')
     444              self._i_opened_the_file = f
     445          try:
     446              self.initfp(f)
     447          except:
     448              if self._i_opened_the_file:
     449                  f.close()
     450              raise
     451  
     452      def initfp(self, file):
     453          self._file = file
     454          self._convert = None
     455          self._nchannels = 0
     456          self._sampwidth = 0
     457          self._framerate = 0
     458          self._nframes = 0
     459          self._nframeswritten = 0
     460          self._datawritten = 0
     461          self._datalength = 0
     462          self._headerwritten = False
     463  
     464      def __del__(self):
     465          self.close()
     466  
     467      def __enter__(self):
     468          return self
     469  
     470      def __exit__(self, *args):
     471          self.close()
     472  
     473      #
     474      # User visible methods.
     475      #
     476      def setnchannels(self, nchannels):
     477          if self._datawritten:
     478              raise Error('cannot change parameters after starting to write')
     479          if nchannels < 1:
     480              raise Error('bad # of channels')
     481          self._nchannels = nchannels
     482  
     483      def getnchannels(self):
     484          if not self._nchannels:
     485              raise Error('number of channels not set')
     486          return self._nchannels
     487  
     488      def setsampwidth(self, sampwidth):
     489          if self._datawritten:
     490              raise Error('cannot change parameters after starting to write')
     491          if sampwidth < 1 or sampwidth > 4:
     492              raise Error('bad sample width')
     493          self._sampwidth = sampwidth
     494  
     495      def getsampwidth(self):
     496          if not self._sampwidth:
     497              raise Error('sample width not set')
     498          return self._sampwidth
     499  
     500      def setframerate(self, framerate):
     501          if self._datawritten:
     502              raise Error('cannot change parameters after starting to write')
     503          if framerate <= 0:
     504              raise Error('bad frame rate')
     505          self._framerate = int(round(framerate))
     506  
     507      def getframerate(self):
     508          if not self._framerate:
     509              raise Error('frame rate not set')
     510          return self._framerate
     511  
     512      def setnframes(self, nframes):
     513          if self._datawritten:
     514              raise Error('cannot change parameters after starting to write')
     515          self._nframes = nframes
     516  
     517      def getnframes(self):
     518          return self._nframeswritten
     519  
     520      def setcomptype(self, comptype, compname):
     521          if self._datawritten:
     522              raise Error('cannot change parameters after starting to write')
     523          if comptype not in ('NONE',):
     524              raise Error('unsupported compression type')
     525          self._comptype = comptype
     526          self._compname = compname
     527  
     528      def getcomptype(self):
     529          return self._comptype
     530  
     531      def getcompname(self):
     532          return self._compname
     533  
     534      def setparams(self, params):
     535          nchannels, sampwidth, framerate, nframes, comptype, compname = params
     536          if self._datawritten:
     537              raise Error('cannot change parameters after starting to write')
     538          self.setnchannels(nchannels)
     539          self.setsampwidth(sampwidth)
     540          self.setframerate(framerate)
     541          self.setnframes(nframes)
     542          self.setcomptype(comptype, compname)
     543  
     544      def getparams(self):
     545          if not self._nchannels or not self._sampwidth or not self._framerate:
     546              raise Error('not all parameters set')
     547          return _wave_params(self._nchannels, self._sampwidth, self._framerate,
     548                self._nframes, self._comptype, self._compname)
     549  
     550      def setmark(self, id, pos, name):
     551          raise Error('setmark() not supported')
     552  
     553      def getmark(self, id):
     554          raise Error('no marks')
     555  
     556      def getmarkers(self):
     557          return None
     558  
     559      def tell(self):
     560          return self._nframeswritten
     561  
     562      def writeframesraw(self, data):
     563          if not isinstance(data, (bytes, bytearray)):
     564              data = memoryview(data).cast('B')
     565          self._ensure_header_written(len(data))
     566          nframes = len(data) // (self._sampwidth * self._nchannels)
     567          if self._convert:
     568              data = self._convert(data)
     569          if self._sampwidth != 1 and sys.byteorder == 'big':
     570              data = _byteswap(data, self._sampwidth)
     571          self._file.write(data)
     572          self._datawritten += len(data)
     573          self._nframeswritten = self._nframeswritten + nframes
     574  
     575      def writeframes(self, data):
     576          self.writeframesraw(data)
     577          if self._datalength != self._datawritten:
     578              self._patchheader()
     579  
     580      def close(self):
     581          try:
     582              if self._file:
     583                  self._ensure_header_written(0)
     584                  if self._datalength != self._datawritten:
     585                      self._patchheader()
     586                  self._file.flush()
     587          finally:
     588              self._file = None
     589              file = self._i_opened_the_file
     590              if file:
     591                  self._i_opened_the_file = None
     592                  file.close()
     593  
     594      #
     595      # Internal methods.
     596      #
     597  
     598      def _ensure_header_written(self, datasize):
     599          if not self._headerwritten:
     600              if not self._nchannels:
     601                  raise Error('# channels not specified')
     602              if not self._sampwidth:
     603                  raise Error('sample width not specified')
     604              if not self._framerate:
     605                  raise Error('sampling rate not specified')
     606              self._write_header(datasize)
     607  
     608      def _write_header(self, initlength):
     609          assert not self._headerwritten
     610          self._file.write(b'RIFF')
     611          if not self._nframes:
     612              self._nframes = initlength // (self._nchannels * self._sampwidth)
     613          self._datalength = self._nframes * self._nchannels * self._sampwidth
     614          try:
     615              self._form_length_pos = self._file.tell()
     616          except (AttributeError, OSError):
     617              self._form_length_pos = None
     618          self._file.write(struct.pack('<L4s4sLHHLLHH4s',
     619              36 + self._datalength, b'WAVE', b'fmt ', 16,
     620              WAVE_FORMAT_PCM, self._nchannels, self._framerate,
     621              self._nchannels * self._framerate * self._sampwidth,
     622              self._nchannels * self._sampwidth,
     623              self._sampwidth * 8, b'data'))
     624          if self._form_length_pos is not None:
     625              self._data_length_pos = self._file.tell()
     626          self._file.write(struct.pack('<L', self._datalength))
     627          self._headerwritten = True
     628  
     629      def _patchheader(self):
     630          assert self._headerwritten
     631          if self._datawritten == self._datalength:
     632              return
     633          curpos = self._file.tell()
     634          self._file.seek(self._form_length_pos, 0)
     635          self._file.write(struct.pack('<L', 36 + self._datawritten))
     636          self._file.seek(self._data_length_pos, 0)
     637          self._file.write(struct.pack('<L', self._datawritten))
     638          self._file.seek(curpos, 0)
     639          self._datalength = self._datawritten
     640  
     641  
     642  def open(f, mode=None):
     643      if mode is None:
     644          if hasattr(f, 'mode'):
     645              mode = f.mode
     646          else:
     647              mode = 'rb'
     648      if mode in ('r', 'rb'):
     649          return Wave_read(f)
     650      elif mode in ('w', 'wb'):
     651          return Wave_write(f)
     652      else:
     653          raise Error("mode must be 'r', 'rb', 'w', or 'wb'")