python (3.12.0)
1 """
2 SAX driver for the pyexpat C module. This driver works with
3 pyexpat.__version__ == '2.22'.
4 """
5
6 version = "0.20"
7
8 from xml.sax._exceptions import *
9 from xml.sax.handler import feature_validation, feature_namespaces
10 from xml.sax.handler import feature_namespace_prefixes
11 from xml.sax.handler import feature_external_ges, feature_external_pes
12 from xml.sax.handler import feature_string_interning
13 from xml.sax.handler import property_xml_string, property_interning_dict
14
15 try:
16 from xml.parsers import expat
17 except ImportError:
18 raise SAXReaderNotAvailable("expat not supported", None)
19 else:
20 if not hasattr(expat, "ParserCreate"):
21 raise SAXReaderNotAvailable("expat not supported", None)
22 from xml.sax import xmlreader, saxutils, handler
23
24 AttributesImpl = xmlreader.AttributesImpl
25 AttributesNSImpl = xmlreader.AttributesNSImpl
26
27 # If we're using a sufficiently recent version of Python, we can use
28 # weak references to avoid cycles between the parser and content
29 # handler, otherwise we'll just have to pretend.
30 try:
31 import _weakref
32 except ImportError:
33 def _mkproxy(o):
34 return o
35 else:
36 import weakref
37 _mkproxy = weakref.proxy
38 del weakref, _weakref
39
40 class ESC[4;38;5;81m_ClosedParser:
41 pass
42
43 # --- ExpatLocator
44
45 class ESC[4;38;5;81mExpatLocator(ESC[4;38;5;149mxmlreaderESC[4;38;5;149m.ESC[4;38;5;149mLocator):
46 """Locator for use with the ExpatParser class.
47
48 This uses a weak reference to the parser object to avoid creating
49 a circular reference between the parser and the content handler.
50 """
51 def __init__(self, parser):
52 self._ref = _mkproxy(parser)
53
54 def getColumnNumber(self):
55 parser = self._ref
56 if parser._parser is None:
57 return None
58 return parser._parser.ErrorColumnNumber
59
60 def getLineNumber(self):
61 parser = self._ref
62 if parser._parser is None:
63 return 1
64 return parser._parser.ErrorLineNumber
65
66 def getPublicId(self):
67 parser = self._ref
68 if parser is None:
69 return None
70 return parser._source.getPublicId()
71
72 def getSystemId(self):
73 parser = self._ref
74 if parser is None:
75 return None
76 return parser._source.getSystemId()
77
78
79 # --- ExpatParser
80
81 class ESC[4;38;5;81mExpatParser(ESC[4;38;5;149mxmlreaderESC[4;38;5;149m.ESC[4;38;5;149mIncrementalParser, ESC[4;38;5;149mxmlreaderESC[4;38;5;149m.ESC[4;38;5;149mLocator):
82 """SAX driver for the pyexpat C module."""
83
84 def __init__(self, namespaceHandling=0, bufsize=2**16-20):
85 xmlreader.IncrementalParser.__init__(self, bufsize)
86 self._source = xmlreader.InputSource()
87 self._parser = None
88 self._namespaces = namespaceHandling
89 self._lex_handler_prop = None
90 self._parsing = False
91 self._entity_stack = []
92 self._external_ges = 0
93 self._interning = None
94
95 # XMLReader methods
96
97 def parse(self, source):
98 "Parse an XML document from a URL or an InputSource."
99 source = saxutils.prepare_input_source(source)
100
101 self._source = source
102 try:
103 self.reset()
104 self._cont_handler.setDocumentLocator(ExpatLocator(self))
105 xmlreader.IncrementalParser.parse(self, source)
106 except:
107 # bpo-30264: Close the source on error to not leak resources:
108 # xml.sax.parse() doesn't give access to the underlying parser
109 # to the caller
110 self._close_source()
111 raise
112
113 def prepareParser(self, source):
114 if source.getSystemId() is not None:
115 self._parser.SetBase(source.getSystemId())
116
117 # Redefined setContentHandler to allow changing handlers during parsing
118
119 def setContentHandler(self, handler):
120 xmlreader.IncrementalParser.setContentHandler(self, handler)
121 if self._parsing:
122 self._reset_cont_handler()
123
124 def getFeature(self, name):
125 if name == feature_namespaces:
126 return self._namespaces
127 elif name == feature_string_interning:
128 return self._interning is not None
129 elif name in (feature_validation, feature_external_pes,
130 feature_namespace_prefixes):
131 return 0
132 elif name == feature_external_ges:
133 return self._external_ges
134 raise SAXNotRecognizedException("Feature '%s' not recognized" % name)
135
136 def setFeature(self, name, state):
137 if self._parsing:
138 raise SAXNotSupportedException("Cannot set features while parsing")
139
140 if name == feature_namespaces:
141 self._namespaces = state
142 elif name == feature_external_ges:
143 self._external_ges = state
144 elif name == feature_string_interning:
145 if state:
146 if self._interning is None:
147 self._interning = {}
148 else:
149 self._interning = None
150 elif name == feature_validation:
151 if state:
152 raise SAXNotSupportedException(
153 "expat does not support validation")
154 elif name == feature_external_pes:
155 if state:
156 raise SAXNotSupportedException(
157 "expat does not read external parameter entities")
158 elif name == feature_namespace_prefixes:
159 if state:
160 raise SAXNotSupportedException(
161 "expat does not report namespace prefixes")
162 else:
163 raise SAXNotRecognizedException(
164 "Feature '%s' not recognized" % name)
165
166 def getProperty(self, name):
167 if name == handler.property_lexical_handler:
168 return self._lex_handler_prop
169 elif name == property_interning_dict:
170 return self._interning
171 elif name == property_xml_string:
172 if self._parser:
173 if hasattr(self._parser, "GetInputContext"):
174 return self._parser.GetInputContext()
175 else:
176 raise SAXNotRecognizedException(
177 "This version of expat does not support getting"
178 " the XML string")
179 else:
180 raise SAXNotSupportedException(
181 "XML string cannot be returned when not parsing")
182 raise SAXNotRecognizedException("Property '%s' not recognized" % name)
183
184 def setProperty(self, name, value):
185 if name == handler.property_lexical_handler:
186 self._lex_handler_prop = value
187 if self._parsing:
188 self._reset_lex_handler_prop()
189 elif name == property_interning_dict:
190 self._interning = value
191 elif name == property_xml_string:
192 raise SAXNotSupportedException("Property '%s' cannot be set" %
193 name)
194 else:
195 raise SAXNotRecognizedException("Property '%s' not recognized" %
196 name)
197
198 # IncrementalParser methods
199
200 def feed(self, data, isFinal=False):
201 if not self._parsing:
202 self.reset()
203 self._parsing = True
204 self._cont_handler.startDocument()
205
206 try:
207 # The isFinal parameter is internal to the expat reader.
208 # If it is set to true, expat will check validity of the entire
209 # document. When feeding chunks, they are not normally final -
210 # except when invoked from close.
211 self._parser.Parse(data, isFinal)
212 except expat.error as e:
213 exc = SAXParseException(expat.ErrorString(e.code), e, self)
214 # FIXME: when to invoke error()?
215 self._err_handler.fatalError(exc)
216
217 def _close_source(self):
218 source = self._source
219 try:
220 file = source.getCharacterStream()
221 if file is not None:
222 file.close()
223 finally:
224 file = source.getByteStream()
225 if file is not None:
226 file.close()
227
228 def close(self):
229 if (self._entity_stack or self._parser is None or
230 isinstance(self._parser, _ClosedParser)):
231 # If we are completing an external entity, do nothing here
232 return
233 try:
234 self.feed(b"", isFinal=True)
235 self._cont_handler.endDocument()
236 self._parsing = False
237 # break cycle created by expat handlers pointing to our methods
238 self._parser = None
239 finally:
240 self._parsing = False
241 if self._parser is not None:
242 # Keep ErrorColumnNumber and ErrorLineNumber after closing.
243 parser = _ClosedParser()
244 parser.ErrorColumnNumber = self._parser.ErrorColumnNumber
245 parser.ErrorLineNumber = self._parser.ErrorLineNumber
246 self._parser = parser
247 self._close_source()
248
249 def _reset_cont_handler(self):
250 self._parser.ProcessingInstructionHandler = \
251 self._cont_handler.processingInstruction
252 self._parser.CharacterDataHandler = self._cont_handler.characters
253
254 def _reset_lex_handler_prop(self):
255 lex = self._lex_handler_prop
256 parser = self._parser
257 if lex is None:
258 parser.CommentHandler = None
259 parser.StartCdataSectionHandler = None
260 parser.EndCdataSectionHandler = None
261 parser.StartDoctypeDeclHandler = None
262 parser.EndDoctypeDeclHandler = None
263 else:
264 parser.CommentHandler = lex.comment
265 parser.StartCdataSectionHandler = lex.startCDATA
266 parser.EndCdataSectionHandler = lex.endCDATA
267 parser.StartDoctypeDeclHandler = self.start_doctype_decl
268 parser.EndDoctypeDeclHandler = lex.endDTD
269
270 def reset(self):
271 if self._namespaces:
272 self._parser = expat.ParserCreate(self._source.getEncoding(), " ",
273 intern=self._interning)
274 self._parser.namespace_prefixes = 1
275 self._parser.StartElementHandler = self.start_element_ns
276 self._parser.EndElementHandler = self.end_element_ns
277 else:
278 self._parser = expat.ParserCreate(self._source.getEncoding(),
279 intern = self._interning)
280 self._parser.StartElementHandler = self.start_element
281 self._parser.EndElementHandler = self.end_element
282
283 self._reset_cont_handler()
284 self._parser.UnparsedEntityDeclHandler = self.unparsed_entity_decl
285 self._parser.NotationDeclHandler = self.notation_decl
286 self._parser.StartNamespaceDeclHandler = self.start_namespace_decl
287 self._parser.EndNamespaceDeclHandler = self.end_namespace_decl
288
289 self._decl_handler_prop = None
290 if self._lex_handler_prop:
291 self._reset_lex_handler_prop()
292 # self._parser.DefaultHandler =
293 # self._parser.DefaultHandlerExpand =
294 # self._parser.NotStandaloneHandler =
295 self._parser.ExternalEntityRefHandler = self.external_entity_ref
296 try:
297 self._parser.SkippedEntityHandler = self.skipped_entity_handler
298 except AttributeError:
299 # This pyexpat does not support SkippedEntity
300 pass
301 self._parser.SetParamEntityParsing(
302 expat.XML_PARAM_ENTITY_PARSING_UNLESS_STANDALONE)
303
304 self._parsing = False
305 self._entity_stack = []
306
307 # Locator methods
308
309 def getColumnNumber(self):
310 if self._parser is None:
311 return None
312 return self._parser.ErrorColumnNumber
313
314 def getLineNumber(self):
315 if self._parser is None:
316 return 1
317 return self._parser.ErrorLineNumber
318
319 def getPublicId(self):
320 return self._source.getPublicId()
321
322 def getSystemId(self):
323 return self._source.getSystemId()
324
325 # event handlers
326 def start_element(self, name, attrs):
327 self._cont_handler.startElement(name, AttributesImpl(attrs))
328
329 def end_element(self, name):
330 self._cont_handler.endElement(name)
331
332 def start_element_ns(self, name, attrs):
333 pair = name.split()
334 if len(pair) == 1:
335 # no namespace
336 pair = (None, name)
337 elif len(pair) == 3:
338 pair = pair[0], pair[1]
339 else:
340 # default namespace
341 pair = tuple(pair)
342
343 newattrs = {}
344 qnames = {}
345 for (aname, value) in attrs.items():
346 parts = aname.split()
347 length = len(parts)
348 if length == 1:
349 # no namespace
350 qname = aname
351 apair = (None, aname)
352 elif length == 3:
353 qname = "%s:%s" % (parts[2], parts[1])
354 apair = parts[0], parts[1]
355 else:
356 # default namespace
357 qname = parts[1]
358 apair = tuple(parts)
359
360 newattrs[apair] = value
361 qnames[apair] = qname
362
363 self._cont_handler.startElementNS(pair, None,
364 AttributesNSImpl(newattrs, qnames))
365
366 def end_element_ns(self, name):
367 pair = name.split()
368 if len(pair) == 1:
369 pair = (None, name)
370 elif len(pair) == 3:
371 pair = pair[0], pair[1]
372 else:
373 pair = tuple(pair)
374
375 self._cont_handler.endElementNS(pair, None)
376
377 # this is not used (call directly to ContentHandler)
378 def processing_instruction(self, target, data):
379 self._cont_handler.processingInstruction(target, data)
380
381 # this is not used (call directly to ContentHandler)
382 def character_data(self, data):
383 self._cont_handler.characters(data)
384
385 def start_namespace_decl(self, prefix, uri):
386 self._cont_handler.startPrefixMapping(prefix, uri)
387
388 def end_namespace_decl(self, prefix):
389 self._cont_handler.endPrefixMapping(prefix)
390
391 def start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
392 self._lex_handler_prop.startDTD(name, pubid, sysid)
393
394 def unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
395 self._dtd_handler.unparsedEntityDecl(name, pubid, sysid, notation_name)
396
397 def notation_decl(self, name, base, sysid, pubid):
398 self._dtd_handler.notationDecl(name, pubid, sysid)
399
400 def external_entity_ref(self, context, base, sysid, pubid):
401 if not self._external_ges:
402 return 1
403
404 source = self._ent_handler.resolveEntity(pubid, sysid)
405 source = saxutils.prepare_input_source(source,
406 self._source.getSystemId() or
407 "")
408
409 self._entity_stack.append((self._parser, self._source))
410 self._parser = self._parser.ExternalEntityParserCreate(context)
411 self._source = source
412
413 try:
414 xmlreader.IncrementalParser.parse(self, source)
415 except:
416 return 0 # FIXME: save error info here?
417
418 (self._parser, self._source) = self._entity_stack[-1]
419 del self._entity_stack[-1]
420 return 1
421
422 def skipped_entity_handler(self, name, is_pe):
423 if is_pe:
424 # The SAX spec requires to report skipped PEs with a '%'
425 name = '%'+name
426 self._cont_handler.skippedEntity(name)
427
428 # ---
429
430 def create_parser(*args, **kwargs):
431 return ExpatParser(*args, **kwargs)
432
433 # ---
434
435 if __name__ == "__main__":
436 import xml.sax.saxutils
437 p = create_parser()
438 p.setContentHandler(xml.sax.saxutils.XMLGenerator())
439 p.setErrorHandler(xml.sax.ErrorHandler())
440 p.parse("http://www.ibiblio.org/xml/examples/shakespeare/hamlet.xml")