• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import itertools
2import sys
3import unittest
4from functools import partial
5from unittest import TestCase
6from unittest.mock import MagicMock, call, patch, ANY
7
8from .support import handle_all_events, code_to_events
9
10try:
11    from _pyrepl.console import Event
12    from _pyrepl.unix_console import UnixConsole
13except ImportError:
14    pass
15
16
17def unix_console(events, **kwargs):
18    console = UnixConsole()
19    console.get_event = MagicMock(side_effect=events)
20
21    height = kwargs.get("height", 25)
22    width = kwargs.get("width", 80)
23    console.getheightwidth = MagicMock(side_effect=lambda: (height, width))
24
25    console.prepare()
26    for key, val in kwargs.items():
27        setattr(console, key, val)
28    return console
29
30
31handle_events_unix_console = partial(
32    handle_all_events,
33    prepare_console=partial(unix_console),
34)
35handle_events_narrow_unix_console = partial(
36    handle_all_events,
37    prepare_console=partial(unix_console, width=5),
38)
39handle_events_short_unix_console = partial(
40    handle_all_events,
41    prepare_console=partial(unix_console, height=1),
42)
43handle_events_unix_console_height_3 = partial(
44    handle_all_events, prepare_console=partial(unix_console, height=3)
45)
46
47
48TERM_CAPABILITIES = {
49    "bel": b"\x07",
50    "civis": b"\x1b[?25l",
51    "clear": b"\x1b[H\x1b[2J",
52    "cnorm": b"\x1b[?12l\x1b[?25h",
53    "cub": b"\x1b[%p1%dD",
54    "cub1": b"\x08",
55    "cud": b"\x1b[%p1%dB",
56    "cud1": b"\n",
57    "cuf": b"\x1b[%p1%dC",
58    "cuf1": b"\x1b[C",
59    "cup": b"\x1b[%i%p1%d;%p2%dH",
60    "cuu": b"\x1b[%p1%dA",
61    "cuu1": b"\x1b[A",
62    "dch1": b"\x1b[P",
63    "dch": b"\x1b[%p1%dP",
64    "el": b"\x1b[K",
65    "hpa": b"\x1b[%i%p1%dG",
66    "ich": b"\x1b[%p1%d@",
67    "ich1": None,
68    "ind": b"\n",
69    "pad": None,
70    "ri": b"\x1bM",
71    "rmkx": b"\x1b[?1l\x1b>",
72    "smkx": b"\x1b[?1h\x1b=",
73}
74
75
76@unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows")
77@patch("_pyrepl.curses.tigetstr", lambda s: TERM_CAPABILITIES.get(s))
78@patch(
79    "_pyrepl.curses.tparm",
80    lambda s, *args: s + b":" + b",".join(str(i).encode() for i in args),
81)
82@patch("_pyrepl.curses.setupterm", lambda a, b: None)
83@patch(
84    "termios.tcgetattr",
85    lambda _: [
86        27394,
87        3,
88        19200,
89        536872399,
90        38400,
91        38400,
92        [
93            b"\x04",
94            b"\xff",
95            b"\xff",
96            b"\x7f",
97            b"\x17",
98            b"\x15",
99            b"\x12",
100            b"\x00",
101            b"\x03",
102            b"\x1c",
103            b"\x1a",
104            b"\x19",
105            b"\x11",
106            b"\x13",
107            b"\x16",
108            b"\x0f",
109            b"\x01",
110            b"\x00",
111            b"\x14",
112            b"\x00",
113        ],
114    ],
115)
116@patch("termios.tcsetattr", lambda a, b, c: None)
117@patch("os.write")
118class TestConsole(TestCase):
119    def test_simple_addition(self, _os_write):
120        code = "12+34"
121        events = code_to_events(code)
122        _, con = handle_events_unix_console(events)
123        _os_write.assert_any_call(ANY, b"1")
124        _os_write.assert_any_call(ANY, b"2")
125        _os_write.assert_any_call(ANY, b"+")
126        _os_write.assert_any_call(ANY, b"3")
127        _os_write.assert_any_call(ANY, b"4")
128        con.restore()
129
130    def test_wrap(self, _os_write):
131        code = "12+34"
132        events = code_to_events(code)
133        _, con = handle_events_narrow_unix_console(events)
134        _os_write.assert_any_call(ANY, b"1")
135        _os_write.assert_any_call(ANY, b"2")
136        _os_write.assert_any_call(ANY, b"+")
137        _os_write.assert_any_call(ANY, b"3")
138        _os_write.assert_any_call(ANY, b"\\")
139        _os_write.assert_any_call(ANY, b"\n")
140        _os_write.assert_any_call(ANY, b"4")
141        con.restore()
142
143    def test_cursor_left(self, _os_write):
144        code = "1"
145        events = itertools.chain(
146            code_to_events(code),
147            [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
148        )
149        _, con = handle_events_unix_console(events)
150        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1")
151        con.restore()
152
153    def test_cursor_left_right(self, _os_write):
154        code = "1"
155        events = itertools.chain(
156            code_to_events(code),
157            [
158                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
159                Event(evt="key", data="right", raw=bytearray(b"\x1bOC")),
160            ],
161        )
162        _, con = handle_events_unix_console(events)
163        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1")
164        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuf"] + b":1")
165        con.restore()
166
167    def test_cursor_up(self, _os_write):
168        code = "1\n2+3"
169        events = itertools.chain(
170            code_to_events(code),
171            [Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))],
172        )
173        _, con = handle_events_unix_console(events)
174        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1")
175        con.restore()
176
177    def test_cursor_up_down(self, _os_write):
178        code = "1\n2+3"
179        events = itertools.chain(
180            code_to_events(code),
181            [
182                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
183                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
184            ],
185        )
186        _, con = handle_events_unix_console(events)
187        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1")
188        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cud"] + b":1")
189        con.restore()
190
191    def test_cursor_back_write(self, _os_write):
192        events = itertools.chain(
193            code_to_events("1"),
194            [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
195            code_to_events("2"),
196        )
197        _, con = handle_events_unix_console(events)
198        _os_write.assert_any_call(ANY, b"1")
199        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1")
200        _os_write.assert_any_call(ANY, b"2")
201        con.restore()
202
203    def test_multiline_function_move_up_short_terminal(self, _os_write):
204        # fmt: off
205        code = (
206            "def f():\n"
207            "  foo"
208        )
209        # fmt: on
210
211        events = itertools.chain(
212            code_to_events(code),
213            [
214                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
215                Event(evt="scroll", data=None),
216            ],
217        )
218        _, con = handle_events_short_unix_console(events)
219        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":")
220        con.restore()
221
222    def test_multiline_function_move_up_down_short_terminal(self, _os_write):
223        # fmt: off
224        code = (
225            "def f():\n"
226            "  foo"
227        )
228        # fmt: on
229
230        events = itertools.chain(
231            code_to_events(code),
232            [
233                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
234                Event(evt="scroll", data=None),
235                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
236                Event(evt="scroll", data=None),
237            ],
238        )
239        _, con = handle_events_short_unix_console(events)
240        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":")
241        _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ind"] + b":")
242        con.restore()
243
244    def test_resize_bigger_on_multiline_function(self, _os_write):
245        # fmt: off
246        code = (
247            "def f():\n"
248            "  foo"
249        )
250        # fmt: on
251
252        events = itertools.chain(code_to_events(code))
253        reader, console = handle_events_short_unix_console(events)
254
255        console.height = 2
256        console.getheightwidth = MagicMock(lambda _: (2, 80))
257
258        def same_reader(_):
259            return reader
260
261        def same_console(events):
262            console.get_event = MagicMock(side_effect=events)
263            return console
264
265        _, con = handle_all_events(
266            [Event(evt="resize", data=None)],
267            prepare_reader=same_reader,
268            prepare_console=same_console,
269        )
270        _os_write.assert_has_calls(
271            [
272                call(ANY, TERM_CAPABILITIES["ri"] + b":"),
273                call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"),
274                call(ANY, b"def f():"),
275            ]
276        )
277        console.restore()
278        con.restore()
279
280    def test_resize_smaller_on_multiline_function(self, _os_write):
281        # fmt: off
282        code = (
283            "def f():\n"
284            "  foo"
285        )
286        # fmt: on
287
288        events = itertools.chain(code_to_events(code))
289        reader, console = handle_events_unix_console_height_3(events)
290
291        console.height = 1
292        console.getheightwidth = MagicMock(lambda _: (1, 80))
293
294        def same_reader(_):
295            return reader
296
297        def same_console(events):
298            console.get_event = MagicMock(side_effect=events)
299            return console
300
301        _, con = handle_all_events(
302            [Event(evt="resize", data=None)],
303            prepare_reader=same_reader,
304            prepare_console=same_console,
305        )
306        _os_write.assert_has_calls(
307            [
308                call(ANY, TERM_CAPABILITIES["ind"] + b":"),
309                call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"),
310                call(ANY, b"  foo"),
311            ]
312        )
313        console.restore()
314        con.restore()
315