1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
|
"""
SAX driver for the pyexpat C module. This driver works with
pyexpat.__version__ == '2.22'.
"""
version = "0.20"
from xml.sax._exceptions import *
from xml.sax.handler import feature_validation, feature_namespaces
from xml.sax.handler import feature_namespace_prefixes
from xml.sax.handler import feature_external_ges, feature_external_pes
from xml.sax.handler import feature_string_interning
from xml.sax.handler import property_xml_string, property_interning_dict
# xml.parsers.expat does not raise ImportError in Jython
import sys
if sys.platform[:4] == "java":
raise SAXReaderNotAvailable("expat not available in Java", None)
del sys
try:
from xml.parsers import expat
except ImportError:
raise SAXReaderNotAvailable("expat not supported", None)
else:
if not hasattr(expat, "ParserCreate"):
raise SAXReaderNotAvailable("expat not supported", None)
from xml.sax import xmlreader, saxutils, handler
AttributesImpl = xmlreader.AttributesImpl
AttributesNSImpl = xmlreader.AttributesNSImpl
# If we're using a sufficiently recent version of Python, we can use
# weak references to avoid cycles between the parser and content
# handler, otherwise we'll just have to pretend.
try:
import _weakref
except ImportError:
def _mkproxy(o):
return o
else:
import weakref
_mkproxy = weakref.proxy
del weakref, _weakref
class _ClosedParser:
pass
# --- ExpatLocator
class ExpatLocator(xmlreader.Locator):
"""Locator for use with the ExpatParser class.
This uses a weak reference to the parser object to avoid creating
a circular reference between the parser and the content handler.
"""
def __init__(self, parser):
self._ref = _mkproxy(parser)
def getColumnNumber(self):
parser = self._ref
if parser._parser is None:
return None
return parser._parser.ErrorColumnNumber
def getLineNumber(self):
parser = self._ref
if parser._parser is None:
return 1
return parser._parser.ErrorLineNumber
def getPublicId(self):
parser = self._ref
if parser is None:
return None
return parser._source.getPublicId()
def getSystemId(self):
parser = self._ref
if parser is None:
return None
return parser._source.getSystemId()
# --- ExpatParser
class ExpatParser(xmlreader.IncrementalParser, xmlreader.Locator):
"""SAX driver for the pyexpat C module."""
def __init__(self, namespaceHandling=0, bufsize=2**16-20):
xmlreader.IncrementalParser.__init__(self, bufsize)
self._source = xmlreader.InputSource()
self._parser = None
self._namespaces = namespaceHandling
self._lex_handler_prop = None
self._parsing = 0
self._entity_stack = []
self._external_ges = 1
self._interning = None
# XMLReader methods
def parse(self, source):
"Parse an XML document from a URL or an InputSource."
source = saxutils.prepare_input_source(source)
self._source = source
self.reset()
self._cont_handler.setDocumentLocator(ExpatLocator(self))
xmlreader.IncrementalParser.parse(self, source)
def prepareParser(self, source):
if source.getSystemId() is not None:
base = source.getSystemId()
if isinstance(base, unicode):
base = base.encode('utf-8')
self._parser.SetBase(base)
# Redefined setContentHandler to allow changing handlers during parsing
def setContentHandler(self, handler):
xmlreader.IncrementalParser.setContentHandler(self, handler)
if self._parsing:
self._reset_cont_handler()
def getFeature(self, name):
if name == feature_namespaces:
return self._namespaces
elif name == feature_string_interning:
return self._interning is not None
elif name in (feature_validation, feature_external_pes,
feature_namespace_prefixes):
return 0
elif name == feature_external_ges:
return self._external_ges
raise SAXNotRecognizedException("Feature '%s' not recognized" % name)
def setFeature(self, name, state):
if self._parsing:
raise SAXNotSupportedException("Cannot set features while parsing")
if name == feature_namespaces:
self._namespaces = state
elif name == feature_external_ges:
self._external_ges = state
elif name == feature_string_interning:
if state:
if self._interning is None:
self._interning = {}
else:
self._interning = None
elif name == feature_validation:
if state:
raise SAXNotSupportedException(
"expat does not support validation")
elif name == feature_external_pes:
if state:
raise SAXNotSupportedException(
"expat does not read external parameter entities")
elif name == feature_namespace_prefixes:
if state:
raise SAXNotSupportedException(
"expat does not report namespace prefixes")
else:
raise SAXNotRecognizedException(
"Feature '%s' not recognized" % name)
def getProperty(self, name):
if name == handler.property_lexical_handler:
return self._lex_handler_prop
elif name == property_interning_dict:
return self._interning
elif name == property_xml_string:
if self._parser:
if hasattr(self._parser, "GetInputContext"):
return self._parser.GetInputContext()
else:
raise SAXNotRecognizedException(
"This version of expat does not support getting"
" the XML string")
else:
raise SAXNotSupportedException(
"XML string cannot be returned when not parsing")
raise SAXNotRecognizedException("Property '%s' not recognized" % name)
def setProperty(self, name, value):
if name == handler.property_lexical_handler:
self._lex_handler_prop = value
if self._parsing:
self._reset_lex_handler_prop()
elif name == property_interning_dict:
self._interning = value
elif name == property_xml_string:
raise SAXNotSupportedException("Property '%s' cannot be set" %
name)
else:
raise SAXNotRecognizedException("Property '%s' not recognized" %
name)
# IncrementalParser methods
def feed(self, data, isFinal = 0):
if not self._parsing:
self.reset()
self._parsing = 1
self._cont_handler.startDocument()
try:
# The isFinal parameter is internal to the expat reader.
# If it is set to true, expat will check validity of the entire
# document. When feeding chunks, they are not normally final -
# except when invoked from close.
self._parser.Parse(data, isFinal)
except expat.error, e:
exc = SAXParseException(expat.ErrorString(e.code), e, self)
# FIXME: when to invoke error()?
self._err_handler.fatalError(exc)
def close(self):
if (self._entity_stack or self._parser is None or
isinstance(self._parser, _ClosedParser)):
# If we are completing an external entity, do nothing here
return
try:
self.feed("", isFinal = 1)
self._cont_handler.endDocument()
self._parsing = 0
# break cycle created by expat handlers pointing to our methods
self._parser = None
finally:
self._parsing = 0
if self._parser is not None:
# Keep ErrorColumnNumber and ErrorLineNumber after closing.
parser = _ClosedParser()
parser.ErrorColumnNumber = self._parser.ErrorColumnNumber
parser.ErrorLineNumber = self._parser.ErrorLineNumber
self._parser = parser
def _reset_cont_handler(self):
self._parser.ProcessingInstructionHandler = \
self._cont_handler.processingInstruction
self._parser.CharacterDataHandler = self._cont_handler.characters
def _reset_lex_handler_prop(self):
lex = self._lex_handler_prop
parser = self._parser
if lex is None:
parser.CommentHandler = None
parser.StartCdataSectionHandler = None
parser.EndCdataSectionHandler = None
parser.StartDoctypeDeclHandler = None
parser.EndDoctypeDeclHandler = None
else:
parser.CommentHandler = lex.comment
parser.StartCdataSectionHandler = lex.startCDATA
parser.EndCdataSectionHandler = lex.endCDATA
parser.StartDoctypeDeclHandler = self.start_doctype_decl
parser.EndDoctypeDeclHandler = lex.endDTD
def reset(self):
if self._namespaces:
self._parser = expat.ParserCreate(self._source.getEncoding(), " ",
intern=self._interning)
self._parser.namespace_prefixes = 1
self._parser.StartElementHandler = self.start_element_ns
self._parser.EndElementHandler = self.end_element_ns
else:
self._parser = expat.ParserCreate(self._source.getEncoding(),
intern = self._interning)
self._parser.StartElementHandler = self.start_element
self._parser.EndElementHandler = self.end_element
self._reset_cont_handler()
self._parser.UnparsedEntityDeclHandler = self.unparsed_entity_decl
self._parser.NotationDeclHandler = self.notation_decl
self._parser.StartNamespaceDeclHandler = self.start_namespace_decl
self._parser.EndNamespaceDeclHandler = self.end_namespace_decl
self._decl_handler_prop = None
if self._lex_handler_prop:
self._reset_lex_handler_prop()
# self._parser.DefaultHandler =
# self._parser.DefaultHandlerExpand =
# self._parser.NotStandaloneHandler =
self._parser.ExternalEntityRefHandler = self.external_entity_ref
try:
self._parser.SkippedEntityHandler = self.skipped_entity_handler
except AttributeError:
# This pyexpat does not support SkippedEntity
pass
self._parser.SetParamEntityParsing(
expat.XML_PARAM_ENTITY_PARSING_UNLESS_STANDALONE)
self._parsing = 0
self._entity_stack = []
# Locator methods
def getColumnNumber(self):
if self._parser is None:
return None
return self._parser.ErrorColumnNumber
def getLineNumber(self):
if self._parser is None:
return 1
return self._parser.ErrorLineNumber
def getPublicId(self):
return self._source.getPublicId()
def getSystemId(self):
return self._source.getSystemId()
# event handlers
def start_element(self, name, attrs):
self._cont_handler.startElement(name, AttributesImpl(attrs))
def end_element(self, name):
self._cont_handler.endElement(name)
def start_element_ns(self, name, attrs):
pair = name.split()
if len(pair) == 1:
# no namespace
pair = (None, name)
elif len(pair) == 3:
pair = pair[0], pair[1]
else:
# default namespace
pair = tuple(pair)
newattrs = {}
qnames = {}
for (aname, value) in attrs.items():
parts = aname.split()
length = len(parts)
if length == 1:
# no namespace
qname = aname
apair = (None, aname)
elif length == 3:
qname = "%s:%s" % (parts[2], parts[1])
apair = parts[0], parts[1]
else:
# default namespace
qname = parts[1]
apair = tuple(parts)
newattrs[apair] = value
qnames[apair] = qname
self._cont_handler.startElementNS(pair, None,
AttributesNSImpl(newattrs, qnames))
def end_element_ns(self, name):
pair = name.split()
if len(pair) == 1:
pair = (None, name)
elif len(pair) == 3:
pair = pair[0], pair[1]
else:
pair = tuple(pair)
self._cont_handler.endElementNS(pair, None)
# this is not used (call directly to ContentHandler)
def processing_instruction(self, target, data):
self._cont_handler.processingInstruction(target, data)
# this is not used (call directly to ContentHandler)
def character_data(self, data):
self._cont_handler.characters(data)
def start_namespace_decl(self, prefix, uri):
self._cont_handler.startPrefixMapping(prefix, uri)
def end_namespace_decl(self, prefix):
self._cont_handler.endPrefixMapping(prefix)
def start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
self._lex_handler_prop.startDTD(name, pubid, sysid)
def unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
self._dtd_handler.unparsedEntityDecl(name, pubid, sysid, notation_name)
def notation_decl(self, name, base, sysid, pubid):
self._dtd_handler.notationDecl(name, pubid, sysid)
def external_entity_ref(self, context, base, sysid, pubid):
if not self._external_ges:
return 1
source = self._ent_handler.resolveEntity(pubid, sysid)
source = saxutils.prepare_input_source(source,
self._source.getSystemId() or
"")
self._entity_stack.append((self._parser, self._source))
self._parser = self._parser.ExternalEntityParserCreate(context)
self._source = source
try:
xmlreader.IncrementalParser.parse(self, source)
except:
return 0 # FIXME: save error info here?
(self._parser, self._source) = self._entity_stack[-1]
del self._entity_stack[-1]
return 1
def skipped_entity_handler(self, name, is_pe):
if is_pe:
# The SAX spec requires to report skipped PEs with a '%'
name = '%'+name
self._cont_handler.skippedEntity(name)
# ---
def create_parser(*args, **kwargs):
return ExpatParser(*args, **kwargs)
# ---
if __name__ == "__main__":
import xml.sax.saxutils
p = create_parser()
p.setContentHandler(xml.sax.saxutils.XMLGenerator())
p.setErrorHandler(xml.sax.ErrorHandler())
p.parse("http://www.ibiblio.org/xml/examples/shakespeare/hamlet.xml")
|