1import itertools 2import sys 3import unittest 4from functools import partial 5from unittest import TestCase 6from unittest.mock import MagicMock, call, patch, ANY 7 8from .support import handle_all_events, code_to_events 9 10try: 11 from _pyrepl.console import Event 12 from _pyrepl.unix_console import UnixConsole 13except ImportError: 14 pass 15 16 17def unix_console(events, **kwargs): 18 console = UnixConsole() 19 console.get_event = MagicMock(side_effect=events) 20 21 height = kwargs.get("height", 25) 22 width = kwargs.get("width", 80) 23 console.getheightwidth = MagicMock(side_effect=lambda: (height, width)) 24 25 console.prepare() 26 for key, val in kwargs.items(): 27 setattr(console, key, val) 28 return console 29 30 31handle_events_unix_console = partial( 32 handle_all_events, 33 prepare_console=partial(unix_console), 34) 35handle_events_narrow_unix_console = partial( 36 handle_all_events, 37 prepare_console=partial(unix_console, width=5), 38) 39handle_events_short_unix_console = partial( 40 handle_all_events, 41 prepare_console=partial(unix_console, height=1), 42) 43handle_events_unix_console_height_3 = partial( 44 handle_all_events, prepare_console=partial(unix_console, height=3) 45) 46 47 48TERM_CAPABILITIES = { 49 "bel": b"\x07", 50 "civis": b"\x1b[?25l", 51 "clear": b"\x1b[H\x1b[2J", 52 "cnorm": b"\x1b[?12l\x1b[?25h", 53 "cub": b"\x1b[%p1%dD", 54 "cub1": b"\x08", 55 "cud": b"\x1b[%p1%dB", 56 "cud1": b"\n", 57 "cuf": b"\x1b[%p1%dC", 58 "cuf1": b"\x1b[C", 59 "cup": b"\x1b[%i%p1%d;%p2%dH", 60 "cuu": b"\x1b[%p1%dA", 61 "cuu1": b"\x1b[A", 62 "dch1": b"\x1b[P", 63 "dch": b"\x1b[%p1%dP", 64 "el": b"\x1b[K", 65 "hpa": b"\x1b[%i%p1%dG", 66 "ich": b"\x1b[%p1%d@", 67 "ich1": None, 68 "ind": b"\n", 69 "pad": None, 70 "ri": b"\x1bM", 71 "rmkx": b"\x1b[?1l\x1b>", 72 "smkx": b"\x1b[?1h\x1b=", 73} 74 75 76@unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows") 77@patch("_pyrepl.curses.tigetstr", lambda s: TERM_CAPABILITIES.get(s)) 78@patch( 79 "_pyrepl.curses.tparm", 80 lambda s, *args: s + b":" + b",".join(str(i).encode() for i in args), 81) 82@patch("_pyrepl.curses.setupterm", lambda a, b: None) 83@patch( 84 "termios.tcgetattr", 85 lambda _: [ 86 27394, 87 3, 88 19200, 89 536872399, 90 38400, 91 38400, 92 [ 93 b"\x04", 94 b"\xff", 95 b"\xff", 96 b"\x7f", 97 b"\x17", 98 b"\x15", 99 b"\x12", 100 b"\x00", 101 b"\x03", 102 b"\x1c", 103 b"\x1a", 104 b"\x19", 105 b"\x11", 106 b"\x13", 107 b"\x16", 108 b"\x0f", 109 b"\x01", 110 b"\x00", 111 b"\x14", 112 b"\x00", 113 ], 114 ], 115) 116@patch("termios.tcsetattr", lambda a, b, c: None) 117@patch("os.write") 118class TestConsole(TestCase): 119 def test_simple_addition(self, _os_write): 120 code = "12+34" 121 events = code_to_events(code) 122 _, con = handle_events_unix_console(events) 123 _os_write.assert_any_call(ANY, b"1") 124 _os_write.assert_any_call(ANY, b"2") 125 _os_write.assert_any_call(ANY, b"+") 126 _os_write.assert_any_call(ANY, b"3") 127 _os_write.assert_any_call(ANY, b"4") 128 con.restore() 129 130 def test_wrap(self, _os_write): 131 code = "12+34" 132 events = code_to_events(code) 133 _, con = handle_events_narrow_unix_console(events) 134 _os_write.assert_any_call(ANY, b"1") 135 _os_write.assert_any_call(ANY, b"2") 136 _os_write.assert_any_call(ANY, b"+") 137 _os_write.assert_any_call(ANY, b"3") 138 _os_write.assert_any_call(ANY, b"\\") 139 _os_write.assert_any_call(ANY, b"\n") 140 _os_write.assert_any_call(ANY, b"4") 141 con.restore() 142 143 def test_cursor_left(self, _os_write): 144 code = "1" 145 events = itertools.chain( 146 code_to_events(code), 147 [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))], 148 ) 149 _, con = handle_events_unix_console(events) 150 _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1") 151 con.restore() 152 153 def test_cursor_left_right(self, _os_write): 154 code = "1" 155 events = itertools.chain( 156 code_to_events(code), 157 [ 158 Event(evt="key", data="left", raw=bytearray(b"\x1bOD")), 159 Event(evt="key", data="right", raw=bytearray(b"\x1bOC")), 160 ], 161 ) 162 _, con = handle_events_unix_console(events) 163 _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1") 164 _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuf"] + b":1") 165 con.restore() 166 167 def test_cursor_up(self, _os_write): 168 code = "1\n2+3" 169 events = itertools.chain( 170 code_to_events(code), 171 [Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))], 172 ) 173 _, con = handle_events_unix_console(events) 174 _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1") 175 con.restore() 176 177 def test_cursor_up_down(self, _os_write): 178 code = "1\n2+3" 179 events = itertools.chain( 180 code_to_events(code), 181 [ 182 Event(evt="key", data="up", raw=bytearray(b"\x1bOA")), 183 Event(evt="key", data="down", raw=bytearray(b"\x1bOB")), 184 ], 185 ) 186 _, con = handle_events_unix_console(events) 187 _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1") 188 _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cud"] + b":1") 189 con.restore() 190 191 def test_cursor_back_write(self, _os_write): 192 events = itertools.chain( 193 code_to_events("1"), 194 [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))], 195 code_to_events("2"), 196 ) 197 _, con = handle_events_unix_console(events) 198 _os_write.assert_any_call(ANY, b"1") 199 _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1") 200 _os_write.assert_any_call(ANY, b"2") 201 con.restore() 202 203 def test_multiline_function_move_up_short_terminal(self, _os_write): 204 # fmt: off 205 code = ( 206 "def f():\n" 207 " foo" 208 ) 209 # fmt: on 210 211 events = itertools.chain( 212 code_to_events(code), 213 [ 214 Event(evt="key", data="up", raw=bytearray(b"\x1bOA")), 215 Event(evt="scroll", data=None), 216 ], 217 ) 218 _, con = handle_events_short_unix_console(events) 219 _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":") 220 con.restore() 221 222 def test_multiline_function_move_up_down_short_terminal(self, _os_write): 223 # fmt: off 224 code = ( 225 "def f():\n" 226 " foo" 227 ) 228 # fmt: on 229 230 events = itertools.chain( 231 code_to_events(code), 232 [ 233 Event(evt="key", data="up", raw=bytearray(b"\x1bOA")), 234 Event(evt="scroll", data=None), 235 Event(evt="key", data="down", raw=bytearray(b"\x1bOB")), 236 Event(evt="scroll", data=None), 237 ], 238 ) 239 _, con = handle_events_short_unix_console(events) 240 _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":") 241 _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ind"] + b":") 242 con.restore() 243 244 def test_resize_bigger_on_multiline_function(self, _os_write): 245 # fmt: off 246 code = ( 247 "def f():\n" 248 " foo" 249 ) 250 # fmt: on 251 252 events = itertools.chain(code_to_events(code)) 253 reader, console = handle_events_short_unix_console(events) 254 255 console.height = 2 256 console.getheightwidth = MagicMock(lambda _: (2, 80)) 257 258 def same_reader(_): 259 return reader 260 261 def same_console(events): 262 console.get_event = MagicMock(side_effect=events) 263 return console 264 265 _, con = handle_all_events( 266 [Event(evt="resize", data=None)], 267 prepare_reader=same_reader, 268 prepare_console=same_console, 269 ) 270 _os_write.assert_has_calls( 271 [ 272 call(ANY, TERM_CAPABILITIES["ri"] + b":"), 273 call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"), 274 call(ANY, b"def f():"), 275 ] 276 ) 277 console.restore() 278 con.restore() 279 280 def test_resize_smaller_on_multiline_function(self, _os_write): 281 # fmt: off 282 code = ( 283 "def f():\n" 284 " foo" 285 ) 286 # fmt: on 287 288 events = itertools.chain(code_to_events(code)) 289 reader, console = handle_events_unix_console_height_3(events) 290 291 console.height = 1 292 console.getheightwidth = MagicMock(lambda _: (1, 80)) 293 294 def same_reader(_): 295 return reader 296 297 def same_console(events): 298 console.get_event = MagicMock(side_effect=events) 299 return console 300 301 _, con = handle_all_events( 302 [Event(evt="resize", data=None)], 303 prepare_reader=same_reader, 304 prepare_console=same_console, 305 ) 306 _os_write.assert_has_calls( 307 [ 308 call(ANY, TERM_CAPABILITIES["ind"] + b":"), 309 call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"), 310 call(ANY, b" foo"), 311 ] 312 ) 313 console.restore() 314 con.restore() 315