• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import itertools
2
3from ShCommands import Command, Pipeline
4
5def tcl_preprocess(data):
6    # Tcl has a preprocessing step to replace escaped newlines.
7    i = data.find('\\\n')
8    if i == -1:
9        return data
10
11    # Replace '\\\n' and subsequent whitespace by a single space.
12    n = len(data)
13    str = data[:i]
14    i += 2
15    while i < n and data[i] in ' \t':
16        i += 1
17    return str + ' ' + data[i:]
18
19class TclLexer:
20    """TclLexer - Lex a string into "words", following the Tcl syntax."""
21
22    def __init__(self, data):
23        self.data = tcl_preprocess(data)
24        self.pos = 0
25        self.end = len(self.data)
26
27    def at_end(self):
28        return self.pos == self.end
29
30    def eat(self):
31        c = self.data[self.pos]
32        self.pos += 1
33        return c
34
35    def look(self):
36        return self.data[self.pos]
37
38    def maybe_eat(self, c):
39        """
40        maybe_eat(c) - Consume the character c if it is the next character,
41        returning True if a character was consumed. """
42        if self.data[self.pos] == c:
43            self.pos += 1
44            return True
45        return False
46
47    def escape(self, c):
48        if c == 'a':
49            return '\x07'
50        elif c == 'b':
51            return '\x08'
52        elif c == 'f':
53            return '\x0c'
54        elif c == 'n':
55            return '\n'
56        elif c == 'r':
57            return '\r'
58        elif c == 't':
59            return '\t'
60        elif c == 'v':
61            return '\x0b'
62        elif c in 'uxo':
63            raise ValueError,'Invalid quoted character %r' % c
64        else:
65            return c
66
67    def lex_braced(self):
68        # Lex until whitespace or end of string, the opening brace has already
69        # been consumed.
70
71        str = ''
72        while 1:
73            if self.at_end():
74                raise ValueError,"Unterminated '{' quoted word"
75
76            c = self.eat()
77            if c == '}':
78                break
79            elif c == '{':
80                str += '{' + self.lex_braced() + '}'
81            elif c == '\\' and self.look() in '{}':
82                str += self.eat()
83            else:
84                str += c
85
86        return str
87
88    def lex_quoted(self):
89        str = ''
90
91        while 1:
92            if self.at_end():
93                raise ValueError,"Unterminated '\"' quoted word"
94
95            c = self.eat()
96            if c == '"':
97                break
98            elif c == '\\':
99                if self.at_end():
100                    raise ValueError,'Missing quoted character'
101
102                str += self.escape(self.eat())
103            else:
104                str += c
105
106        return str
107
108    def lex_unquoted(self, process_all=False):
109        # Lex until whitespace or end of string.
110        str = ''
111        while not self.at_end():
112            if not process_all:
113                if self.look().isspace() or self.look() == ';':
114                    break
115
116            c = self.eat()
117            if c == '\\':
118                if self.at_end():
119                    raise ValueError,'Missing quoted character'
120
121                str += self.escape(self.eat())
122            elif c == '[':
123                raise NotImplementedError, ('Command substitution is '
124                                            'not supported')
125            elif c == '$' and not self.at_end() and (self.look().isalpha() or
126                                                     self.look() == '{'):
127                raise NotImplementedError, ('Variable substitution is '
128                                            'not supported')
129            else:
130                str += c
131
132        return str
133
134    def lex_one_token(self):
135        if self.maybe_eat('"'):
136            return self.lex_quoted()
137        elif self.maybe_eat('{'):
138            # Check for argument substitution.
139            if not self.maybe_eat('*'):
140                return self.lex_braced()
141
142            if not self.maybe_eat('}'):
143                    return '*' + self.lex_braced()
144
145            if self.at_end() or self.look().isspace():
146                return '*'
147
148            raise NotImplementedError, "Argument substitution is unsupported"
149        else:
150            return self.lex_unquoted()
151
152    def lex(self):
153        while not self.at_end():
154            c = self.look()
155            if c in ' \t':
156                self.eat()
157            elif c in ';\n':
158                self.eat()
159                yield (';',)
160            else:
161                yield self.lex_one_token()
162
163class TclExecCommand:
164    kRedirectPrefixes1 = ('<', '>')
165    kRedirectPrefixes2 = ('<@', '<<', '2>', '>&', '>>', '>@')
166    kRedirectPrefixes3 = ('2>@', '2>>', '>>&', '>&@')
167    kRedirectPrefixes4 = ('2>@1',)
168
169    def __init__(self, args):
170        self.args = iter(args)
171
172    def lex(self):
173        try:
174            return self.args.next()
175        except StopIteration:
176            return None
177
178    def look(self):
179        next = self.lex()
180        if next is not None:
181            self.args = itertools.chain([next], self.args)
182        return next
183
184    def parse_redirect(self, tok, length):
185        if len(tok) == length:
186            arg = self.lex()
187            if arg is None:
188                raise ValueError,'Missing argument to %r redirection' % tok
189        else:
190            tok,arg = tok[:length],tok[length:]
191
192        if tok[0] == '2':
193            op = (tok[1:],2)
194        else:
195            op = (tok,)
196        return (op, arg)
197
198    def parse_pipeline(self):
199        if self.look() is None:
200            raise ValueError,"Expected at least one argument to exec"
201
202        commands = [Command([],[])]
203        while 1:
204            arg = self.lex()
205            if arg is None:
206                break
207            elif arg == '|':
208                commands.append(Command([],[]))
209            elif arg == '|&':
210                # Write this as a redirect of stderr; it must come first because
211                # stdout may have already been redirected.
212                commands[-1].redirects.insert(0, (('>&',2),'1'))
213                commands.append(Command([],[]))
214            elif arg[:4] in TclExecCommand.kRedirectPrefixes4:
215                commands[-1].redirects.append(self.parse_redirect(arg, 4))
216            elif arg[:3] in TclExecCommand.kRedirectPrefixes3:
217                commands[-1].redirects.append(self.parse_redirect(arg, 3))
218            elif arg[:2] in TclExecCommand.kRedirectPrefixes2:
219                commands[-1].redirects.append(self.parse_redirect(arg, 2))
220            elif arg[:1] in TclExecCommand.kRedirectPrefixes1:
221                commands[-1].redirects.append(self.parse_redirect(arg, 1))
222            else:
223                commands[-1].args.append(arg)
224
225        return Pipeline(commands, False, pipe_err=True)
226
227    def parse(self):
228        ignoreStderr = False
229        keepNewline = False
230
231        # Parse arguments.
232        while 1:
233            next = self.look()
234            if not isinstance(next, str) or next[0] != '-':
235                break
236
237            if next == '--':
238                self.lex()
239                break
240            elif next == '-ignorestderr':
241                ignoreStderr = True
242            elif next == '-keepnewline':
243                keepNewline = True
244            else:
245                raise ValueError,"Invalid exec argument %r" % next
246
247        return (ignoreStderr, keepNewline, self.parse_pipeline())
248
249###
250
251import unittest
252
253class TestTclLexer(unittest.TestCase):
254    def lex(self, str, *args, **kwargs):
255        return list(TclLexer(str, *args, **kwargs).lex())
256
257    def test_preprocess(self):
258        self.assertEqual(tcl_preprocess('a b'), 'a b')
259        self.assertEqual(tcl_preprocess('a\\\nb c'), 'a b c')
260
261    def test_unquoted(self):
262        self.assertEqual(self.lex('a b c'),
263                         ['a', 'b', 'c'])
264        self.assertEqual(self.lex(r'a\nb\tc\ '),
265                         ['a\nb\tc '])
266        self.assertEqual(self.lex(r'a \\\$b c $\\'),
267                         ['a', r'\$b', 'c', '$\\'])
268
269    def test_braced(self):
270        self.assertEqual(self.lex('a {b c} {}'),
271                         ['a', 'b c', ''])
272        self.assertEqual(self.lex(r'a {b {c\n}}'),
273                         ['a', 'b {c\\n}'])
274        self.assertEqual(self.lex(r'a {b\{}'),
275                         ['a', 'b{'])
276        self.assertEqual(self.lex(r'{*}'), ['*'])
277        self.assertEqual(self.lex(r'{*} a'), ['*', 'a'])
278        self.assertEqual(self.lex(r'{*} a'), ['*', 'a'])
279        self.assertEqual(self.lex('{a\\\n   b}'),
280                         ['a b'])
281
282    def test_quoted(self):
283        self.assertEqual(self.lex('a "b c"'),
284                         ['a', 'b c'])
285
286    def test_terminators(self):
287        self.assertEqual(self.lex('a\nb'),
288                         ['a', (';',), 'b'])
289        self.assertEqual(self.lex('a;b'),
290                         ['a', (';',), 'b'])
291        self.assertEqual(self.lex('a   ;   b'),
292                         ['a', (';',), 'b'])
293
294class TestTclExecCommand(unittest.TestCase):
295    def parse(self, str):
296        return TclExecCommand(list(TclLexer(str).lex())).parse()
297
298    def test_basic(self):
299        self.assertEqual(self.parse('echo hello'),
300                         (False, False,
301                          Pipeline([Command(['echo', 'hello'], [])],
302                                   False, True)))
303        self.assertEqual(self.parse('echo hello | grep hello'),
304                         (False, False,
305                          Pipeline([Command(['echo', 'hello'], []),
306                                    Command(['grep', 'hello'], [])],
307                                   False, True)))
308
309    def test_redirect(self):
310        self.assertEqual(self.parse('echo hello > a >b >>c 2> d |& e'),
311                         (False, False,
312                          Pipeline([Command(['echo', 'hello'],
313                                            [(('>&',2),'1'),
314                                             (('>',),'a'),
315                                             (('>',),'b'),
316                                             (('>>',),'c'),
317                                             (('>',2),'d')]),
318                                    Command(['e'], [])],
319                                   False, True)))
320
321if __name__ == '__main__':
322    unittest.main()
323