python (3.11.7)

(root)/
lib/
python3.11/
xml/
sax/
expatreader.py
       1  """
       2  SAX driver for the pyexpat C module.  This driver works with
       3  pyexpat.__version__ == '2.22'.
       4  """
       5  
       6  version = "0.20"
       7  
       8  from xml.sax._exceptions import *
       9  from xml.sax.handler import feature_validation, feature_namespaces
      10  from xml.sax.handler import feature_namespace_prefixes
      11  from xml.sax.handler import feature_external_ges, feature_external_pes
      12  from xml.sax.handler import feature_string_interning
      13  from xml.sax.handler import property_xml_string, property_interning_dict
      14  
      15  # xml.parsers.expat does not raise ImportError in Jython
      16  import sys
      17  if sys.platform[:4] == "java":
      18      raise SAXReaderNotAvailable("expat not available in Java", None)
      19  del sys
      20  
      21  try:
      22      from xml.parsers import expat
      23  except ImportError:
      24      raise SAXReaderNotAvailable("expat not supported", None)
      25  else:
      26      if not hasattr(expat, "ParserCreate"):
      27          raise SAXReaderNotAvailable("expat not supported", None)
      28  from xml.sax import xmlreader, saxutils, handler
      29  
      30  AttributesImpl = xmlreader.AttributesImpl
      31  AttributesNSImpl = xmlreader.AttributesNSImpl
      32  
      33  # If we're using a sufficiently recent version of Python, we can use
      34  # weak references to avoid cycles between the parser and content
      35  # handler, otherwise we'll just have to pretend.
      36  try:
      37      import _weakref
      38  except ImportError:
      39      def _mkproxy(o):
      40          return o
      41  else:
      42      import weakref
      43      _mkproxy = weakref.proxy
      44      del weakref, _weakref
      45  
      46  class ESC[4;38;5;81m_ClosedParser:
      47      pass
      48  
      49  # --- ExpatLocator
      50  
      51  class ESC[4;38;5;81mExpatLocator(ESC[4;38;5;149mxmlreaderESC[4;38;5;149m.ESC[4;38;5;149mLocator):
      52      """Locator for use with the ExpatParser class.
      53  
      54      This uses a weak reference to the parser object to avoid creating
      55      a circular reference between the parser and the content handler.
      56      """
      57      def __init__(self, parser):
      58          self._ref = _mkproxy(parser)
      59  
      60      def getColumnNumber(self):
      61          parser = self._ref
      62          if parser._parser is None:
      63              return None
      64          return parser._parser.ErrorColumnNumber
      65  
      66      def getLineNumber(self):
      67          parser = self._ref
      68          if parser._parser is None:
      69              return 1
      70          return parser._parser.ErrorLineNumber
      71  
      72      def getPublicId(self):
      73          parser = self._ref
      74          if parser is None:
      75              return None
      76          return parser._source.getPublicId()
      77  
      78      def getSystemId(self):
      79          parser = self._ref
      80          if parser is None:
      81              return None
      82          return parser._source.getSystemId()
      83  
      84  
      85  # --- ExpatParser
      86  
      87  class ESC[4;38;5;81mExpatParser(ESC[4;38;5;149mxmlreaderESC[4;38;5;149m.ESC[4;38;5;149mIncrementalParser, ESC[4;38;5;149mxmlreaderESC[4;38;5;149m.ESC[4;38;5;149mLocator):
      88      """SAX driver for the pyexpat C module."""
      89  
      90      def __init__(self, namespaceHandling=0, bufsize=2**16-20):
      91          xmlreader.IncrementalParser.__init__(self, bufsize)
      92          self._source = xmlreader.InputSource()
      93          self._parser = None
      94          self._namespaces = namespaceHandling
      95          self._lex_handler_prop = None
      96          self._parsing = False
      97          self._entity_stack = []
      98          self._external_ges = 0
      99          self._interning = None
     100  
     101      # XMLReader methods
     102  
     103      def parse(self, source):
     104          "Parse an XML document from a URL or an InputSource."
     105          source = saxutils.prepare_input_source(source)
     106  
     107          self._source = source
     108          try:
     109              self.reset()
     110              self._cont_handler.setDocumentLocator(ExpatLocator(self))
     111              xmlreader.IncrementalParser.parse(self, source)
     112          except:
     113              # bpo-30264: Close the source on error to not leak resources:
     114              # xml.sax.parse() doesn't give access to the underlying parser
     115              # to the caller
     116              self._close_source()
     117              raise
     118  
     119      def prepareParser(self, source):
     120          if source.getSystemId() is not None:
     121              self._parser.SetBase(source.getSystemId())
     122  
     123      # Redefined setContentHandler to allow changing handlers during parsing
     124  
     125      def setContentHandler(self, handler):
     126          xmlreader.IncrementalParser.setContentHandler(self, handler)
     127          if self._parsing:
     128              self._reset_cont_handler()
     129  
     130      def getFeature(self, name):
     131          if name == feature_namespaces:
     132              return self._namespaces
     133          elif name == feature_string_interning:
     134              return self._interning is not None
     135          elif name in (feature_validation, feature_external_pes,
     136                        feature_namespace_prefixes):
     137              return 0
     138          elif name == feature_external_ges:
     139              return self._external_ges
     140          raise SAXNotRecognizedException("Feature '%s' not recognized" % name)
     141  
     142      def setFeature(self, name, state):
     143          if self._parsing:
     144              raise SAXNotSupportedException("Cannot set features while parsing")
     145  
     146          if name == feature_namespaces:
     147              self._namespaces = state
     148          elif name == feature_external_ges:
     149              self._external_ges = state
     150          elif name == feature_string_interning:
     151              if state:
     152                  if self._interning is None:
     153                      self._interning = {}
     154              else:
     155                  self._interning = None
     156          elif name == feature_validation:
     157              if state:
     158                  raise SAXNotSupportedException(
     159                      "expat does not support validation")
     160          elif name == feature_external_pes:
     161              if state:
     162                  raise SAXNotSupportedException(
     163                      "expat does not read external parameter entities")
     164          elif name == feature_namespace_prefixes:
     165              if state:
     166                  raise SAXNotSupportedException(
     167                      "expat does not report namespace prefixes")
     168          else:
     169              raise SAXNotRecognizedException(
     170                  "Feature '%s' not recognized" % name)
     171  
     172      def getProperty(self, name):
     173          if name == handler.property_lexical_handler:
     174              return self._lex_handler_prop
     175          elif name == property_interning_dict:
     176              return self._interning
     177          elif name == property_xml_string:
     178              if self._parser:
     179                  if hasattr(self._parser, "GetInputContext"):
     180                      return self._parser.GetInputContext()
     181                  else:
     182                      raise SAXNotRecognizedException(
     183                          "This version of expat does not support getting"
     184                          " the XML string")
     185              else:
     186                  raise SAXNotSupportedException(
     187                      "XML string cannot be returned when not parsing")
     188          raise SAXNotRecognizedException("Property '%s' not recognized" % name)
     189  
     190      def setProperty(self, name, value):
     191          if name == handler.property_lexical_handler:
     192              self._lex_handler_prop = value
     193              if self._parsing:
     194                  self._reset_lex_handler_prop()
     195          elif name == property_interning_dict:
     196              self._interning = value
     197          elif name == property_xml_string:
     198              raise SAXNotSupportedException("Property '%s' cannot be set" %
     199                                             name)
     200          else:
     201              raise SAXNotRecognizedException("Property '%s' not recognized" %
     202                                              name)
     203  
     204      # IncrementalParser methods
     205  
     206      def feed(self, data, isFinal=False):
     207          if not self._parsing:
     208              self.reset()
     209              self._parsing = True
     210              self._cont_handler.startDocument()
     211  
     212          try:
     213              # The isFinal parameter is internal to the expat reader.
     214              # If it is set to true, expat will check validity of the entire
     215              # document. When feeding chunks, they are not normally final -
     216              # except when invoked from close.
     217              self._parser.Parse(data, isFinal)
     218          except expat.error as e:
     219              exc = SAXParseException(expat.ErrorString(e.code), e, self)
     220              # FIXME: when to invoke error()?
     221              self._err_handler.fatalError(exc)
     222  
     223      def _close_source(self):
     224          source = self._source
     225          try:
     226              file = source.getCharacterStream()
     227              if file is not None:
     228                  file.close()
     229          finally:
     230              file = source.getByteStream()
     231              if file is not None:
     232                  file.close()
     233  
     234      def close(self):
     235          if (self._entity_stack or self._parser is None or
     236              isinstance(self._parser, _ClosedParser)):
     237              # If we are completing an external entity, do nothing here
     238              return
     239          try:
     240              self.feed(b"", isFinal=True)
     241              self._cont_handler.endDocument()
     242              self._parsing = False
     243              # break cycle created by expat handlers pointing to our methods
     244              self._parser = None
     245          finally:
     246              self._parsing = False
     247              if self._parser is not None:
     248                  # Keep ErrorColumnNumber and ErrorLineNumber after closing.
     249                  parser = _ClosedParser()
     250                  parser.ErrorColumnNumber = self._parser.ErrorColumnNumber
     251                  parser.ErrorLineNumber = self._parser.ErrorLineNumber
     252                  self._parser = parser
     253              self._close_source()
     254  
     255      def _reset_cont_handler(self):
     256          self._parser.ProcessingInstructionHandler = \
     257                                      self._cont_handler.processingInstruction
     258          self._parser.CharacterDataHandler = self._cont_handler.characters
     259  
     260      def _reset_lex_handler_prop(self):
     261          lex = self._lex_handler_prop
     262          parser = self._parser
     263          if lex is None:
     264              parser.CommentHandler = None
     265              parser.StartCdataSectionHandler = None
     266              parser.EndCdataSectionHandler = None
     267              parser.StartDoctypeDeclHandler = None
     268              parser.EndDoctypeDeclHandler = None
     269          else:
     270              parser.CommentHandler = lex.comment
     271              parser.StartCdataSectionHandler = lex.startCDATA
     272              parser.EndCdataSectionHandler = lex.endCDATA
     273              parser.StartDoctypeDeclHandler = self.start_doctype_decl
     274              parser.EndDoctypeDeclHandler = lex.endDTD
     275  
     276      def reset(self):
     277          if self._namespaces:
     278              self._parser = expat.ParserCreate(self._source.getEncoding(), " ",
     279                                                intern=self._interning)
     280              self._parser.namespace_prefixes = 1
     281              self._parser.StartElementHandler = self.start_element_ns
     282              self._parser.EndElementHandler = self.end_element_ns
     283          else:
     284              self._parser = expat.ParserCreate(self._source.getEncoding(),
     285                                                intern = self._interning)
     286              self._parser.StartElementHandler = self.start_element
     287              self._parser.EndElementHandler = self.end_element
     288  
     289          self._reset_cont_handler()
     290          self._parser.UnparsedEntityDeclHandler = self.unparsed_entity_decl
     291          self._parser.NotationDeclHandler = self.notation_decl
     292          self._parser.StartNamespaceDeclHandler = self.start_namespace_decl
     293          self._parser.EndNamespaceDeclHandler = self.end_namespace_decl
     294  
     295          self._decl_handler_prop = None
     296          if self._lex_handler_prop:
     297              self._reset_lex_handler_prop()
     298  #         self._parser.DefaultHandler =
     299  #         self._parser.DefaultHandlerExpand =
     300  #         self._parser.NotStandaloneHandler =
     301          self._parser.ExternalEntityRefHandler = self.external_entity_ref
     302          try:
     303              self._parser.SkippedEntityHandler = self.skipped_entity_handler
     304          except AttributeError:
     305              # This pyexpat does not support SkippedEntity
     306              pass
     307          self._parser.SetParamEntityParsing(
     308              expat.XML_PARAM_ENTITY_PARSING_UNLESS_STANDALONE)
     309  
     310          self._parsing = False
     311          self._entity_stack = []
     312  
     313      # Locator methods
     314  
     315      def getColumnNumber(self):
     316          if self._parser is None:
     317              return None
     318          return self._parser.ErrorColumnNumber
     319  
     320      def getLineNumber(self):
     321          if self._parser is None:
     322              return 1
     323          return self._parser.ErrorLineNumber
     324  
     325      def getPublicId(self):
     326          return self._source.getPublicId()
     327  
     328      def getSystemId(self):
     329          return self._source.getSystemId()
     330  
     331      # event handlers
     332      def start_element(self, name, attrs):
     333          self._cont_handler.startElement(name, AttributesImpl(attrs))
     334  
     335      def end_element(self, name):
     336          self._cont_handler.endElement(name)
     337  
     338      def start_element_ns(self, name, attrs):
     339          pair = name.split()
     340          if len(pair) == 1:
     341              # no namespace
     342              pair = (None, name)
     343          elif len(pair) == 3:
     344              pair = pair[0], pair[1]
     345          else:
     346              # default namespace
     347              pair = tuple(pair)
     348  
     349          newattrs = {}
     350          qnames = {}
     351          for (aname, value) in attrs.items():
     352              parts = aname.split()
     353              length = len(parts)
     354              if length == 1:
     355                  # no namespace
     356                  qname = aname
     357                  apair = (None, aname)
     358              elif length == 3:
     359                  qname = "%s:%s" % (parts[2], parts[1])
     360                  apair = parts[0], parts[1]
     361              else:
     362                  # default namespace
     363                  qname = parts[1]
     364                  apair = tuple(parts)
     365  
     366              newattrs[apair] = value
     367              qnames[apair] = qname
     368  
     369          self._cont_handler.startElementNS(pair, None,
     370                                            AttributesNSImpl(newattrs, qnames))
     371  
     372      def end_element_ns(self, name):
     373          pair = name.split()
     374          if len(pair) == 1:
     375              pair = (None, name)
     376          elif len(pair) == 3:
     377              pair = pair[0], pair[1]
     378          else:
     379              pair = tuple(pair)
     380  
     381          self._cont_handler.endElementNS(pair, None)
     382  
     383      # this is not used (call directly to ContentHandler)
     384      def processing_instruction(self, target, data):
     385          self._cont_handler.processingInstruction(target, data)
     386  
     387      # this is not used (call directly to ContentHandler)
     388      def character_data(self, data):
     389          self._cont_handler.characters(data)
     390  
     391      def start_namespace_decl(self, prefix, uri):
     392          self._cont_handler.startPrefixMapping(prefix, uri)
     393  
     394      def end_namespace_decl(self, prefix):
     395          self._cont_handler.endPrefixMapping(prefix)
     396  
     397      def start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
     398          self._lex_handler_prop.startDTD(name, pubid, sysid)
     399  
     400      def unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
     401          self._dtd_handler.unparsedEntityDecl(name, pubid, sysid, notation_name)
     402  
     403      def notation_decl(self, name, base, sysid, pubid):
     404          self._dtd_handler.notationDecl(name, pubid, sysid)
     405  
     406      def external_entity_ref(self, context, base, sysid, pubid):
     407          if not self._external_ges:
     408              return 1
     409  
     410          source = self._ent_handler.resolveEntity(pubid, sysid)
     411          source = saxutils.prepare_input_source(source,
     412                                                 self._source.getSystemId() or
     413                                                 "")
     414  
     415          self._entity_stack.append((self._parser, self._source))
     416          self._parser = self._parser.ExternalEntityParserCreate(context)
     417          self._source = source
     418  
     419          try:
     420              xmlreader.IncrementalParser.parse(self, source)
     421          except:
     422              return 0  # FIXME: save error info here?
     423  
     424          (self._parser, self._source) = self._entity_stack[-1]
     425          del self._entity_stack[-1]
     426          return 1
     427  
     428      def skipped_entity_handler(self, name, is_pe):
     429          if is_pe:
     430              # The SAX spec requires to report skipped PEs with a '%'
     431              name = '%'+name
     432          self._cont_handler.skippedEntity(name)
     433  
     434  # ---
     435  
     436  def create_parser(*args, **kwargs):
     437      return ExpatParser(*args, **kwargs)
     438  
     439  # ---
     440  
     441  if __name__ == "__main__":
     442      import xml.sax.saxutils
     443      p = create_parser()
     444      p.setContentHandler(xml.sax.saxutils.XMLGenerator())
     445      p.setErrorHandler(xml.sax.ErrorHandler())
     446      p.parse("http://www.ibiblio.org/xml/examples/shakespeare/hamlet.xml")