• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Implementation of the DOM Level 3 'LS-Load' feature."""
2
3import copy
4import warnings
5import xml.dom
6
7from xml.dom.NodeFilter import NodeFilter
8
9
10__all__ = ["DOMBuilder", "DOMEntityResolver", "DOMInputSource"]
11
12
13class Options:
14    """Features object that has variables set for each DOMBuilder feature.
15
16    The DOMBuilder class uses an instance of this class to pass settings to
17    the ExpatBuilder class.
18    """
19
20    # Note that the DOMBuilder class in LoadSave constrains which of these
21    # values can be set using the DOM Level 3 LoadSave feature.
22
23    namespaces = 1
24    namespace_declarations = True
25    validation = False
26    external_parameter_entities = True
27    external_general_entities = True
28    external_dtd_subset = True
29    validate_if_schema = False
30    validate = False
31    datatype_normalization = False
32    create_entity_ref_nodes = True
33    entities = True
34    whitespace_in_element_content = True
35    cdata_sections = True
36    comments = True
37    charset_overrides_xml_encoding = True
38    infoset = False
39    supported_mediatypes_only = False
40
41    errorHandler = None
42    filter = None
43
44
45class DOMBuilder:
46    entityResolver = None
47    errorHandler = None
48    filter = None
49
50    ACTION_REPLACE = 1
51    ACTION_APPEND_AS_CHILDREN = 2
52    ACTION_INSERT_AFTER = 3
53    ACTION_INSERT_BEFORE = 4
54
55    _legal_actions = (ACTION_REPLACE, ACTION_APPEND_AS_CHILDREN,
56                      ACTION_INSERT_AFTER, ACTION_INSERT_BEFORE)
57
58    def __init__(self):
59        self._options = Options()
60
61    def _get_entityResolver(self):
62        return self.entityResolver
63    def _set_entityResolver(self, entityResolver):
64        self.entityResolver = entityResolver
65
66    def _get_errorHandler(self):
67        return self.errorHandler
68    def _set_errorHandler(self, errorHandler):
69        self.errorHandler = errorHandler
70
71    def _get_filter(self):
72        return self.filter
73    def _set_filter(self, filter):
74        self.filter = filter
75
76    def setFeature(self, name, state):
77        if self.supportsFeature(name):
78            state = state and 1 or 0
79            try:
80                settings = self._settings[(_name_xform(name), state)]
81            except KeyError:
82                raise xml.dom.NotSupportedErr(
83                    "unsupported feature: %r" % (name,)) from None
84            else:
85                for name, value in settings:
86                    setattr(self._options, name, value)
87        else:
88            raise xml.dom.NotFoundErr("unknown feature: " + repr(name))
89
90    def supportsFeature(self, name):
91        return hasattr(self._options, _name_xform(name))
92
93    def canSetFeature(self, name, state):
94        key = (_name_xform(name), state and 1 or 0)
95        return key in self._settings
96
97    # This dictionary maps from (feature,value) to a list of
98    # (option,value) pairs that should be set on the Options object.
99    # If a (feature,value) setting is not in this dictionary, it is
100    # not supported by the DOMBuilder.
101    #
102    _settings = {
103        ("namespace_declarations", 0): [
104            ("namespace_declarations", 0)],
105        ("namespace_declarations", 1): [
106            ("namespace_declarations", 1)],
107        ("validation", 0): [
108            ("validation", 0)],
109        ("external_general_entities", 0): [
110            ("external_general_entities", 0)],
111        ("external_general_entities", 1): [
112            ("external_general_entities", 1)],
113        ("external_parameter_entities", 0): [
114            ("external_parameter_entities", 0)],
115        ("external_parameter_entities", 1): [
116            ("external_parameter_entities", 1)],
117        ("validate_if_schema", 0): [
118            ("validate_if_schema", 0)],
119        ("create_entity_ref_nodes", 0): [
120            ("create_entity_ref_nodes", 0)],
121        ("create_entity_ref_nodes", 1): [
122            ("create_entity_ref_nodes", 1)],
123        ("entities", 0): [
124            ("create_entity_ref_nodes", 0),
125            ("entities", 0)],
126        ("entities", 1): [
127            ("entities", 1)],
128        ("whitespace_in_element_content", 0): [
129            ("whitespace_in_element_content", 0)],
130        ("whitespace_in_element_content", 1): [
131            ("whitespace_in_element_content", 1)],
132        ("cdata_sections", 0): [
133            ("cdata_sections", 0)],
134        ("cdata_sections", 1): [
135            ("cdata_sections", 1)],
136        ("comments", 0): [
137            ("comments", 0)],
138        ("comments", 1): [
139            ("comments", 1)],
140        ("charset_overrides_xml_encoding", 0): [
141            ("charset_overrides_xml_encoding", 0)],
142        ("charset_overrides_xml_encoding", 1): [
143            ("charset_overrides_xml_encoding", 1)],
144        ("infoset", 0): [],
145        ("infoset", 1): [
146            ("namespace_declarations", 0),
147            ("validate_if_schema", 0),
148            ("create_entity_ref_nodes", 0),
149            ("entities", 0),
150            ("cdata_sections", 0),
151            ("datatype_normalization", 1),
152            ("whitespace_in_element_content", 1),
153            ("comments", 1),
154            ("charset_overrides_xml_encoding", 1)],
155        ("supported_mediatypes_only", 0): [
156            ("supported_mediatypes_only", 0)],
157        ("namespaces", 0): [
158            ("namespaces", 0)],
159        ("namespaces", 1): [
160            ("namespaces", 1)],
161    }
162
163    def getFeature(self, name):
164        xname = _name_xform(name)
165        try:
166            return getattr(self._options, xname)
167        except AttributeError:
168            if name == "infoset":
169                options = self._options
170                return (options.datatype_normalization
171                        and options.whitespace_in_element_content
172                        and options.comments
173                        and options.charset_overrides_xml_encoding
174                        and not (options.namespace_declarations
175                                 or options.validate_if_schema
176                                 or options.create_entity_ref_nodes
177                                 or options.entities
178                                 or options.cdata_sections))
179            raise xml.dom.NotFoundErr("feature %s not known" % repr(name))
180
181    def parseURI(self, uri):
182        if self.entityResolver:
183            input = self.entityResolver.resolveEntity(None, uri)
184        else:
185            input = DOMEntityResolver().resolveEntity(None, uri)
186        return self.parse(input)
187
188    def parse(self, input):
189        options = copy.copy(self._options)
190        options.filter = self.filter
191        options.errorHandler = self.errorHandler
192        fp = input.byteStream
193        if fp is None and options.systemId:
194            import urllib.request
195            fp = urllib.request.urlopen(input.systemId)
196        return self._parse_bytestream(fp, options)
197
198    def parseWithContext(self, input, cnode, action):
199        if action not in self._legal_actions:
200            raise ValueError("not a legal action")
201        raise NotImplementedError("Haven't written this yet...")
202
203    def _parse_bytestream(self, stream, options):
204        import xml.dom.expatbuilder
205        builder = xml.dom.expatbuilder.makeBuilder(options)
206        return builder.parseFile(stream)
207
208
209def _name_xform(name):
210    return name.lower().replace('-', '_')
211
212
213class DOMEntityResolver(object):
214    __slots__ = '_opener',
215
216    def resolveEntity(self, publicId, systemId):
217        assert systemId is not None
218        source = DOMInputSource()
219        source.publicId = publicId
220        source.systemId = systemId
221        source.byteStream = self._get_opener().open(systemId)
222
223        # determine the encoding if the transport provided it
224        source.encoding = self._guess_media_encoding(source)
225
226        # determine the base URI is we can
227        import posixpath, urllib.parse
228        parts = urllib.parse.urlparse(systemId)
229        scheme, netloc, path, params, query, fragment = parts
230        # XXX should we check the scheme here as well?
231        if path and not path.endswith("/"):
232            path = posixpath.dirname(path) + "/"
233            parts = scheme, netloc, path, params, query, fragment
234            source.baseURI = urllib.parse.urlunparse(parts)
235
236        return source
237
238    def _get_opener(self):
239        try:
240            return self._opener
241        except AttributeError:
242            self._opener = self._create_opener()
243            return self._opener
244
245    def _create_opener(self):
246        import urllib.request
247        return urllib.request.build_opener()
248
249    def _guess_media_encoding(self, source):
250        info = source.byteStream.info()
251        if "Content-Type" in info:
252            for param in info.getplist():
253                if param.startswith("charset="):
254                    return param.split("=", 1)[1].lower()
255
256
257class DOMInputSource(object):
258    __slots__ = ('byteStream', 'characterStream', 'stringData',
259                 'encoding', 'publicId', 'systemId', 'baseURI')
260
261    def __init__(self):
262        self.byteStream = None
263        self.characterStream = None
264        self.stringData = None
265        self.encoding = None
266        self.publicId = None
267        self.systemId = None
268        self.baseURI = None
269
270    def _get_byteStream(self):
271        return self.byteStream
272    def _set_byteStream(self, byteStream):
273        self.byteStream = byteStream
274
275    def _get_characterStream(self):
276        return self.characterStream
277    def _set_characterStream(self, characterStream):
278        self.characterStream = characterStream
279
280    def _get_stringData(self):
281        return self.stringData
282    def _set_stringData(self, data):
283        self.stringData = data
284
285    def _get_encoding(self):
286        return self.encoding
287    def _set_encoding(self, encoding):
288        self.encoding = encoding
289
290    def _get_publicId(self):
291        return self.publicId
292    def _set_publicId(self, publicId):
293        self.publicId = publicId
294
295    def _get_systemId(self):
296        return self.systemId
297    def _set_systemId(self, systemId):
298        self.systemId = systemId
299
300    def _get_baseURI(self):
301        return self.baseURI
302    def _set_baseURI(self, uri):
303        self.baseURI = uri
304
305
306class DOMBuilderFilter:
307    """Element filter which can be used to tailor construction of
308    a DOM instance.
309    """
310
311    # There's really no need for this class; concrete implementations
312    # should just implement the endElement() and startElement()
313    # methods as appropriate.  Using this makes it easy to only
314    # implement one of them.
315
316    FILTER_ACCEPT = 1
317    FILTER_REJECT = 2
318    FILTER_SKIP = 3
319    FILTER_INTERRUPT = 4
320
321    whatToShow = NodeFilter.SHOW_ALL
322
323    def _get_whatToShow(self):
324        return self.whatToShow
325
326    def acceptNode(self, element):
327        return self.FILTER_ACCEPT
328
329    def startContainer(self, element):
330        return self.FILTER_ACCEPT
331
332del NodeFilter
333
334
335class DocumentLS:
336    """Mixin to create documents that conform to the load/save spec."""
337
338    async_ = False
339
340    def _get_async(self):
341        return False
342
343    def _set_async(self, flag):
344        if flag:
345            raise xml.dom.NotSupportedErr(
346                "asynchronous document loading is not supported")
347
348    def abort(self):
349        # What does it mean to "clear" a document?  Does the
350        # documentElement disappear?
351        raise NotImplementedError(
352            "haven't figured out what this means yet")
353
354    def load(self, uri):
355        raise NotImplementedError("haven't written this yet")
356
357    def loadXML(self, source):
358        raise NotImplementedError("haven't written this yet")
359
360    def saveXML(self, snode):
361        if snode is None:
362            snode = self
363        elif snode.ownerDocument is not self:
364            raise xml.dom.WrongDocumentErr()
365        return snode.toxml()
366
367
368class DOMImplementationLS:
369    MODE_SYNCHRONOUS = 1
370    MODE_ASYNCHRONOUS = 2
371
372    def createDOMBuilder(self, mode, schemaType):
373        if schemaType is not None:
374            raise xml.dom.NotSupportedErr(
375                "schemaType not yet supported")
376        if mode == self.MODE_SYNCHRONOUS:
377            return DOMBuilder()
378        if mode == self.MODE_ASYNCHRONOUS:
379            raise xml.dom.NotSupportedErr(
380                "asynchronous builders are not supported")
381        raise ValueError("unknown value for mode")
382
383    def createDOMWriter(self):
384        raise NotImplementedError(
385            "the writer interface hasn't been written yet!")
386
387    def createDOMInputSource(self):
388        return DOMInputSource()
389