• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1
2__all__ = ['Composer', 'ComposerError']
3
4from .error import MarkedYAMLError
5from .events import *
6from .nodes import *
7
8class ComposerError(MarkedYAMLError):
9    pass
10
11class Composer:
12
13    def __init__(self):
14        self.anchors = {}
15
16    def check_node(self):
17        # Drop the STREAM-START event.
18        if self.check_event(StreamStartEvent):
19            self.get_event()
20
21        # If there are more documents available?
22        return not self.check_event(StreamEndEvent)
23
24    def get_node(self):
25        # Get the root node of the next document.
26        if not self.check_event(StreamEndEvent):
27            return self.compose_document()
28
29    def get_single_node(self):
30        # Drop the STREAM-START event.
31        self.get_event()
32
33        # Compose a document if the stream is not empty.
34        document = None
35        if not self.check_event(StreamEndEvent):
36            document = self.compose_document()
37
38        # Ensure that the stream contains no more documents.
39        if not self.check_event(StreamEndEvent):
40            event = self.get_event()
41            raise ComposerError("expected a single document in the stream",
42                    document.start_mark, "but found another document",
43                    event.start_mark)
44
45        # Drop the STREAM-END event.
46        self.get_event()
47
48        return document
49
50    def compose_document(self):
51        # Drop the DOCUMENT-START event.
52        self.get_event()
53
54        # Compose the root node.
55        node = self.compose_node(None, None)
56
57        # Drop the DOCUMENT-END event.
58        self.get_event()
59
60        self.anchors = {}
61        return node
62
63    def compose_node(self, parent, index):
64        if self.check_event(AliasEvent):
65            event = self.get_event()
66            anchor = event.anchor
67            if anchor not in self.anchors:
68                raise ComposerError(None, None, "found undefined alias %r"
69                        % anchor, event.start_mark)
70            return self.anchors[anchor]
71        event = self.peek_event()
72        anchor = event.anchor
73        if anchor is not None:
74            if anchor in self.anchors:
75                raise ComposerError("found duplicate anchor %r; first occurrence"
76                        % anchor, self.anchors[anchor].start_mark,
77                        "second occurrence", event.start_mark)
78        self.descend_resolver(parent, index)
79        if self.check_event(ScalarEvent):
80            node = self.compose_scalar_node(anchor)
81        elif self.check_event(SequenceStartEvent):
82            node = self.compose_sequence_node(anchor)
83        elif self.check_event(MappingStartEvent):
84            node = self.compose_mapping_node(anchor)
85        self.ascend_resolver()
86        return node
87
88    def compose_scalar_node(self, anchor):
89        event = self.get_event()
90        tag = event.tag
91        if tag is None or tag == '!':
92            tag = self.resolve(ScalarNode, event.value, event.implicit)
93        node = ScalarNode(tag, event.value,
94                event.start_mark, event.end_mark, style=event.style)
95        if anchor is not None:
96            self.anchors[anchor] = node
97        return node
98
99    def compose_sequence_node(self, anchor):
100        start_event = self.get_event()
101        tag = start_event.tag
102        if tag is None or tag == '!':
103            tag = self.resolve(SequenceNode, None, start_event.implicit)
104        node = SequenceNode(tag, [],
105                start_event.start_mark, None,
106                flow_style=start_event.flow_style)
107        if anchor is not None:
108            self.anchors[anchor] = node
109        index = 0
110        while not self.check_event(SequenceEndEvent):
111            node.value.append(self.compose_node(node, index))
112            index += 1
113        end_event = self.get_event()
114        node.end_mark = end_event.end_mark
115        return node
116
117    def compose_mapping_node(self, anchor):
118        start_event = self.get_event()
119        tag = start_event.tag
120        if tag is None or tag == '!':
121            tag = self.resolve(MappingNode, None, start_event.implicit)
122        node = MappingNode(tag, [],
123                start_event.start_mark, None,
124                flow_style=start_event.flow_style)
125        if anchor is not None:
126            self.anchors[anchor] = node
127        while not self.check_event(MappingEndEvent):
128            #key_event = self.peek_event()
129            item_key = self.compose_node(node, None)
130            #if item_key in node.value:
131            #    raise ComposerError("while composing a mapping", start_event.start_mark,
132            #            "found duplicate key", key_event.start_mark)
133            item_value = self.compose_node(node, item_key)
134            #node.value[item_key] = item_value
135            node.value.append((item_key, item_value))
136        end_event = self.get_event()
137        node.end_mark = end_event.end_mark
138        return node
139
140