1from test.support import verbose, reap_children 2from test.support.import_helper import import_module 3 4# Skip these tests if termios is not available 5import_module('termios') 6 7import errno 8import os 9import pty 10import tty 11import sys 12import select 13import signal 14import socket 15import io # readline 16import unittest 17 18import struct 19import tty 20import fcntl 21import warnings 22 23TEST_STRING_1 = b"I wish to buy a fish license.\n" 24TEST_STRING_2 = b"For my pet fish, Eric.\n" 25 26try: 27 _TIOCGWINSZ = tty.TIOCGWINSZ 28 _TIOCSWINSZ = tty.TIOCSWINSZ 29 _HAVE_WINSZ = True 30except AttributeError: 31 _HAVE_WINSZ = False 32 33if verbose: 34 def debug(msg): 35 print(msg) 36else: 37 def debug(msg): 38 pass 39 40 41# Note that os.read() is nondeterministic so we need to be very careful 42# to make the test suite deterministic. A normal call to os.read() may 43# give us less than expected. 44# 45# Beware, on my Linux system, if I put 'foo\n' into a terminal fd, I get 46# back 'foo\r\n' at the other end. The behavior depends on the termios 47# setting. The newline translation may be OS-specific. To make the 48# test suite deterministic and OS-independent, the functions _readline 49# and normalize_output can be used. 50 51def normalize_output(data): 52 # Some operating systems do conversions on newline. We could possibly fix 53 # that by doing the appropriate termios.tcsetattr()s. I couldn't figure out 54 # the right combo on Tru64. So, just normalize the output and doc the 55 # problem O/Ses by allowing certain combinations for some platforms, but 56 # avoid allowing other differences (like extra whitespace, trailing garbage, 57 # etc.) 58 59 # This is about the best we can do without getting some feedback 60 # from someone more knowledgable. 61 62 # OSF/1 (Tru64) apparently turns \n into \r\r\n. 63 if data.endswith(b'\r\r\n'): 64 return data.replace(b'\r\r\n', b'\n') 65 66 if data.endswith(b'\r\n'): 67 return data.replace(b'\r\n', b'\n') 68 69 return data 70 71def _readline(fd): 72 """Read one line. May block forever if no newline is read.""" 73 reader = io.FileIO(fd, mode='rb', closefd=False) 74 return reader.readline() 75 76def expectedFailureIfStdinIsTTY(fun): 77 # avoid isatty() 78 try: 79 tty.tcgetattr(pty.STDIN_FILENO) 80 return unittest.expectedFailure(fun) 81 except tty.error: 82 pass 83 return fun 84 85def _get_term_winsz(fd): 86 s = struct.pack("HHHH", 0, 0, 0, 0) 87 return fcntl.ioctl(fd, _TIOCGWINSZ, s) 88 89def _set_term_winsz(fd, winsz): 90 fcntl.ioctl(fd, _TIOCSWINSZ, winsz) 91 92 93# Marginal testing of pty suite. Cannot do extensive 'do or fail' testing 94# because pty code is not too portable. 95class PtyTest(unittest.TestCase): 96 def setUp(self): 97 old_alarm = signal.signal(signal.SIGALRM, self.handle_sig) 98 self.addCleanup(signal.signal, signal.SIGALRM, old_alarm) 99 100 old_sighup = signal.signal(signal.SIGHUP, self.handle_sighup) 101 self.addCleanup(signal.signal, signal.SIGHUP, old_sighup) 102 103 # isatty() and close() can hang on some platforms. Set an alarm 104 # before running the test to make sure we don't hang forever. 105 self.addCleanup(signal.alarm, 0) 106 signal.alarm(10) 107 108 # Save original stdin window size 109 self.stdin_rows = None 110 self.stdin_cols = None 111 if _HAVE_WINSZ: 112 try: 113 stdin_dim = os.get_terminal_size(pty.STDIN_FILENO) 114 self.stdin_rows = stdin_dim.lines 115 self.stdin_cols = stdin_dim.columns 116 old_stdin_winsz = struct.pack("HHHH", self.stdin_rows, 117 self.stdin_cols, 0, 0) 118 self.addCleanup(_set_term_winsz, pty.STDIN_FILENO, old_stdin_winsz) 119 except OSError: 120 pass 121 122 def handle_sig(self, sig, frame): 123 self.fail("isatty hung") 124 125 @staticmethod 126 def handle_sighup(signum, frame): 127 pass 128 129 @expectedFailureIfStdinIsTTY 130 def test_openpty(self): 131 try: 132 mode = tty.tcgetattr(pty.STDIN_FILENO) 133 except tty.error: 134 # not a tty or bad/closed fd 135 debug("tty.tcgetattr(pty.STDIN_FILENO) failed") 136 mode = None 137 138 new_stdin_winsz = None 139 if self.stdin_rows is not None and self.stdin_cols is not None: 140 try: 141 # Modify pty.STDIN_FILENO window size; we need to 142 # check if pty.openpty() is able to set pty slave 143 # window size accordingly. 144 debug("Setting pty.STDIN_FILENO window size") 145 debug(f"original size: (rows={self.stdin_rows}, cols={self.stdin_cols})") 146 target_stdin_rows = self.stdin_rows + 1 147 target_stdin_cols = self.stdin_cols + 1 148 debug(f"target size: (rows={target_stdin_rows}, cols={target_stdin_cols})") 149 target_stdin_winsz = struct.pack("HHHH", target_stdin_rows, 150 target_stdin_cols, 0, 0) 151 _set_term_winsz(pty.STDIN_FILENO, target_stdin_winsz) 152 153 # Were we able to set the window size 154 # of pty.STDIN_FILENO successfully? 155 new_stdin_winsz = _get_term_winsz(pty.STDIN_FILENO) 156 self.assertEqual(new_stdin_winsz, target_stdin_winsz, 157 "pty.STDIN_FILENO window size unchanged") 158 except OSError: 159 warnings.warn("Failed to set pty.STDIN_FILENO window size") 160 pass 161 162 try: 163 debug("Calling pty.openpty()") 164 try: 165 master_fd, slave_fd = pty.openpty(mode, new_stdin_winsz) 166 except TypeError: 167 master_fd, slave_fd = pty.openpty() 168 debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'") 169 except OSError: 170 # " An optional feature could not be imported " ... ? 171 raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.") 172 173 # closing master_fd can raise a SIGHUP if the process is 174 # the session leader: we installed a SIGHUP signal handler 175 # to ignore this signal. 176 self.addCleanup(os.close, master_fd) 177 self.addCleanup(os.close, slave_fd) 178 179 self.assertTrue(os.isatty(slave_fd), "slave_fd is not a tty") 180 181 if mode: 182 self.assertEqual(tty.tcgetattr(slave_fd), mode, 183 "openpty() failed to set slave termios") 184 if new_stdin_winsz: 185 self.assertEqual(_get_term_winsz(slave_fd), new_stdin_winsz, 186 "openpty() failed to set slave window size") 187 188 # Ensure the fd is non-blocking in case there's nothing to read. 189 blocking = os.get_blocking(master_fd) 190 try: 191 os.set_blocking(master_fd, False) 192 try: 193 s1 = os.read(master_fd, 1024) 194 self.assertEqual(b'', s1) 195 except OSError as e: 196 if e.errno != errno.EAGAIN: 197 raise 198 finally: 199 # Restore the original flags. 200 os.set_blocking(master_fd, blocking) 201 202 debug("Writing to slave_fd") 203 os.write(slave_fd, TEST_STRING_1) 204 s1 = _readline(master_fd) 205 self.assertEqual(b'I wish to buy a fish license.\n', 206 normalize_output(s1)) 207 208 debug("Writing chunked output") 209 os.write(slave_fd, TEST_STRING_2[:5]) 210 os.write(slave_fd, TEST_STRING_2[5:]) 211 s2 = _readline(master_fd) 212 self.assertEqual(b'For my pet fish, Eric.\n', normalize_output(s2)) 213 214 def test_fork(self): 215 debug("calling pty.fork()") 216 pid, master_fd = pty.fork() 217 self.addCleanup(os.close, master_fd) 218 if pid == pty.CHILD: 219 # stdout should be connected to a tty. 220 if not os.isatty(1): 221 debug("Child's fd 1 is not a tty?!") 222 os._exit(3) 223 224 # After pty.fork(), the child should already be a session leader. 225 # (on those systems that have that concept.) 226 debug("In child, calling os.setsid()") 227 try: 228 os.setsid() 229 except OSError: 230 # Good, we already were session leader 231 debug("Good: OSError was raised.") 232 pass 233 except AttributeError: 234 # Have pty, but not setsid()? 235 debug("No setsid() available?") 236 pass 237 except: 238 # We don't want this error to propagate, escaping the call to 239 # os._exit() and causing very peculiar behavior in the calling 240 # regrtest.py ! 241 # Note: could add traceback printing here. 242 debug("An unexpected error was raised.") 243 os._exit(1) 244 else: 245 debug("os.setsid() succeeded! (bad!)") 246 os._exit(2) 247 os._exit(4) 248 else: 249 debug("Waiting for child (%d) to finish." % pid) 250 # In verbose mode, we have to consume the debug output from the 251 # child or the child will block, causing this test to hang in the 252 # parent's waitpid() call. The child blocks after a 253 # platform-dependent amount of data is written to its fd. On 254 # Linux 2.6, it's 4000 bytes and the child won't block, but on OS 255 # X even the small writes in the child above will block it. Also 256 # on Linux, the read() will raise an OSError (input/output error) 257 # when it tries to read past the end of the buffer but the child's 258 # already exited, so catch and discard those exceptions. It's not 259 # worth checking for EIO. 260 while True: 261 try: 262 data = os.read(master_fd, 80) 263 except OSError: 264 break 265 if not data: 266 break 267 sys.stdout.write(str(data.replace(b'\r\n', b'\n'), 268 encoding='ascii')) 269 270 ##line = os.read(master_fd, 80) 271 ##lines = line.replace('\r\n', '\n').split('\n') 272 ##if False and lines != ['In child, calling os.setsid()', 273 ## 'Good: OSError was raised.', '']: 274 ## raise TestFailed("Unexpected output from child: %r" % line) 275 276 (pid, status) = os.waitpid(pid, 0) 277 res = os.waitstatus_to_exitcode(status) 278 debug("Child (%d) exited with code %d (status %d)." % (pid, res, status)) 279 if res == 1: 280 self.fail("Child raised an unexpected exception in os.setsid()") 281 elif res == 2: 282 self.fail("pty.fork() failed to make child a session leader.") 283 elif res == 3: 284 self.fail("Child spawned by pty.fork() did not have a tty as stdout") 285 elif res != 4: 286 self.fail("pty.fork() failed for unknown reasons.") 287 288 ##debug("Reading from master_fd now that the child has exited") 289 ##try: 290 ## s1 = os.read(master_fd, 1024) 291 ##except OSError: 292 ## pass 293 ##else: 294 ## raise TestFailed("Read from master_fd did not raise exception") 295 296 def test_master_read(self): 297 # XXX(nnorwitz): this test leaks fds when there is an error. 298 debug("Calling pty.openpty()") 299 master_fd, slave_fd = pty.openpty() 300 debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'") 301 302 self.addCleanup(os.close, master_fd) 303 304 debug("Closing slave_fd") 305 os.close(slave_fd) 306 307 debug("Reading from master_fd") 308 try: 309 data = os.read(master_fd, 1) 310 except OSError: # Linux 311 data = b"" 312 313 self.assertEqual(data, b"") 314 315 def test_spawn_doesnt_hang(self): 316 pty.spawn([sys.executable, '-c', 'print("hi there")']) 317 318class SmallPtyTests(unittest.TestCase): 319 """These tests don't spawn children or hang.""" 320 321 def setUp(self): 322 self.orig_stdin_fileno = pty.STDIN_FILENO 323 self.orig_stdout_fileno = pty.STDOUT_FILENO 324 self.orig_pty_close = pty.close 325 self.orig_pty__copy = pty._copy 326 self.orig_pty_fork = pty.fork 327 self.orig_pty_select = pty.select 328 self.orig_pty_setraw = pty.setraw 329 self.orig_pty_tcgetattr = pty.tcgetattr 330 self.orig_pty_tcsetattr = pty.tcsetattr 331 self.orig_pty_waitpid = pty.waitpid 332 self.fds = [] # A list of file descriptors to close. 333 self.files = [] 334 self.select_rfds_lengths = [] 335 self.select_rfds_results = [] 336 self.tcsetattr_mode_setting = None 337 338 def tearDown(self): 339 pty.STDIN_FILENO = self.orig_stdin_fileno 340 pty.STDOUT_FILENO = self.orig_stdout_fileno 341 pty.close = self.orig_pty_close 342 pty._copy = self.orig_pty__copy 343 pty.fork = self.orig_pty_fork 344 pty.select = self.orig_pty_select 345 pty.setraw = self.orig_pty_setraw 346 pty.tcgetattr = self.orig_pty_tcgetattr 347 pty.tcsetattr = self.orig_pty_tcsetattr 348 pty.waitpid = self.orig_pty_waitpid 349 for file in self.files: 350 try: 351 file.close() 352 except OSError: 353 pass 354 for fd in self.fds: 355 try: 356 os.close(fd) 357 except OSError: 358 pass 359 360 def _pipe(self): 361 pipe_fds = os.pipe() 362 self.fds.extend(pipe_fds) 363 return pipe_fds 364 365 def _socketpair(self): 366 socketpair = socket.socketpair() 367 self.files.extend(socketpair) 368 return socketpair 369 370 def _mock_select(self, rfds, wfds, xfds, timeout=0): 371 # This will raise IndexError when no more expected calls exist. 372 # This ignores the timeout 373 self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds)) 374 return self.select_rfds_results.pop(0), [], [] 375 376 def _make_mock_fork(self, pid): 377 def mock_fork(): 378 return (pid, 12) 379 return mock_fork 380 381 def _mock_tcsetattr(self, fileno, opt, mode): 382 self.tcsetattr_mode_setting = mode 383 384 def test__copy_to_each(self): 385 """Test the normal data case on both master_fd and stdin.""" 386 read_from_stdout_fd, mock_stdout_fd = self._pipe() 387 pty.STDOUT_FILENO = mock_stdout_fd 388 mock_stdin_fd, write_to_stdin_fd = self._pipe() 389 pty.STDIN_FILENO = mock_stdin_fd 390 socketpair = self._socketpair() 391 masters = [s.fileno() for s in socketpair] 392 393 # Feed data. Smaller than PIPEBUF. These writes will not block. 394 os.write(masters[1], b'from master') 395 os.write(write_to_stdin_fd, b'from stdin') 396 397 # Expect two select calls, the last one will cause IndexError 398 pty.select = self._mock_select 399 self.select_rfds_lengths.append(2) 400 self.select_rfds_results.append([mock_stdin_fd, masters[0]]) 401 self.select_rfds_lengths.append(2) 402 403 with self.assertRaises(IndexError): 404 pty._copy(masters[0]) 405 406 # Test that the right data went to the right places. 407 rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0] 408 self.assertEqual([read_from_stdout_fd, masters[1]], rfds) 409 self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master') 410 self.assertEqual(os.read(masters[1], 20), b'from stdin') 411 412 def test__copy_eof_on_all(self): 413 """Test the empty read EOF case on both master_fd and stdin.""" 414 read_from_stdout_fd, mock_stdout_fd = self._pipe() 415 pty.STDOUT_FILENO = mock_stdout_fd 416 mock_stdin_fd, write_to_stdin_fd = self._pipe() 417 pty.STDIN_FILENO = mock_stdin_fd 418 socketpair = self._socketpair() 419 masters = [s.fileno() for s in socketpair] 420 421 socketpair[1].close() 422 os.close(write_to_stdin_fd) 423 424 pty.select = self._mock_select 425 self.select_rfds_lengths.append(2) 426 self.select_rfds_results.append([mock_stdin_fd, masters[0]]) 427 # We expect that both fds were removed from the fds list as they 428 # both encountered an EOF before the second select call. 429 self.select_rfds_lengths.append(0) 430 431 # We expect the function to return without error. 432 self.assertEqual(pty._copy(masters[0]), None) 433 434 def test__restore_tty_mode_normal_return(self): 435 """Test that spawn resets the tty mode no when _copy returns normally.""" 436 437 # PID 1 is returned from mocked fork to run the parent branch 438 # of code 439 pty.fork = self._make_mock_fork(1) 440 441 status_sentinel = object() 442 pty.waitpid = lambda _1, _2: [None, status_sentinel] 443 pty.close = lambda _: None 444 445 pty._copy = lambda _1, _2, _3: None 446 447 mode_sentinel = object() 448 pty.tcgetattr = lambda fd: mode_sentinel 449 pty.tcsetattr = self._mock_tcsetattr 450 pty.setraw = lambda _: None 451 452 self.assertEqual(pty.spawn([]), status_sentinel, "pty.waitpid process status not returned by pty.spawn") 453 self.assertEqual(self.tcsetattr_mode_setting, mode_sentinel, "pty.tcsetattr not called with original mode value") 454 455 456def tearDownModule(): 457 reap_children() 458 459 460if __name__ == "__main__": 461 unittest.main() 462