• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Pseudo terminal utilities."""
2
3# Bugs: No signal handling.  Doesn't set slave termios and window size.
4#       Only tested on Linux, FreeBSD, and macOS.
5# See:  W. Richard Stevens. 1992.  Advanced Programming in the
6#       UNIX Environment.  Chapter 19.
7# Author: Steen Lumholt -- with additions by Guido.
8
9from select import select
10import os
11import sys
12import tty
13
14# names imported directly for test mocking purposes
15from os import close, waitpid
16from tty import setraw, tcgetattr, tcsetattr
17
18__all__ = ["openpty", "fork", "spawn"]
19
20STDIN_FILENO = 0
21STDOUT_FILENO = 1
22STDERR_FILENO = 2
23
24CHILD = 0
25
26def openpty():
27    """openpty() -> (master_fd, slave_fd)
28    Open a pty master/slave pair, using os.openpty() if possible."""
29
30    try:
31        return os.openpty()
32    except (AttributeError, OSError):
33        pass
34    master_fd, slave_name = _open_terminal()
35    slave_fd = slave_open(slave_name)
36    return master_fd, slave_fd
37
38def master_open():
39    """master_open() -> (master_fd, slave_name)
40    Open a pty master and return the fd, and the filename of the slave end.
41    Deprecated, use openpty() instead."""
42
43    try:
44        master_fd, slave_fd = os.openpty()
45    except (AttributeError, OSError):
46        pass
47    else:
48        slave_name = os.ttyname(slave_fd)
49        os.close(slave_fd)
50        return master_fd, slave_name
51
52    return _open_terminal()
53
54def _open_terminal():
55    """Open pty master and return (master_fd, tty_name)."""
56    for x in 'pqrstuvwxyzPQRST':
57        for y in '0123456789abcdef':
58            pty_name = '/dev/pty' + x + y
59            try:
60                fd = os.open(pty_name, os.O_RDWR)
61            except OSError:
62                continue
63            return (fd, '/dev/tty' + x + y)
64    raise OSError('out of pty devices')
65
66def slave_open(tty_name):
67    """slave_open(tty_name) -> slave_fd
68    Open the pty slave and acquire the controlling terminal, returning
69    opened filedescriptor.
70    Deprecated, use openpty() instead."""
71
72    result = os.open(tty_name, os.O_RDWR)
73    try:
74        from fcntl import ioctl, I_PUSH
75    except ImportError:
76        return result
77    try:
78        ioctl(result, I_PUSH, "ptem")
79        ioctl(result, I_PUSH, "ldterm")
80    except OSError:
81        pass
82    return result
83
84def fork():
85    """fork() -> (pid, master_fd)
86    Fork and make the child a session leader with a controlling terminal."""
87
88    try:
89        pid, fd = os.forkpty()
90    except (AttributeError, OSError):
91        pass
92    else:
93        if pid == CHILD:
94            try:
95                os.setsid()
96            except OSError:
97                # os.forkpty() already set us session leader
98                pass
99        return pid, fd
100
101    master_fd, slave_fd = openpty()
102    pid = os.fork()
103    if pid == CHILD:
104        # Establish a new session.
105        os.setsid()
106        os.close(master_fd)
107
108        # Slave becomes stdin/stdout/stderr of child.
109        os.dup2(slave_fd, STDIN_FILENO)
110        os.dup2(slave_fd, STDOUT_FILENO)
111        os.dup2(slave_fd, STDERR_FILENO)
112        if slave_fd > STDERR_FILENO:
113            os.close(slave_fd)
114
115        # Explicitly open the tty to make it become a controlling tty.
116        tmp_fd = os.open(os.ttyname(STDOUT_FILENO), os.O_RDWR)
117        os.close(tmp_fd)
118    else:
119        os.close(slave_fd)
120
121    # Parent and child process.
122    return pid, master_fd
123
124def _writen(fd, data):
125    """Write all the data to a descriptor."""
126    while data:
127        n = os.write(fd, data)
128        data = data[n:]
129
130def _read(fd):
131    """Default read function."""
132    return os.read(fd, 1024)
133
134def _copy(master_fd, master_read=_read, stdin_read=_read):
135    """Parent copy loop.
136    Copies
137            pty master -> standard output   (master_read)
138            standard input -> pty master    (stdin_read)"""
139    fds = [master_fd, STDIN_FILENO]
140    while fds:
141        rfds, _wfds, _xfds = select(fds, [], [])
142
143        if master_fd in rfds:
144            # Some OSes signal EOF by returning an empty byte string,
145            # some throw OSErrors.
146            try:
147                data = master_read(master_fd)
148            except OSError:
149                data = b""
150            if not data:  # Reached EOF.
151                return    # Assume the child process has exited and is
152                          # unreachable, so we clean up.
153            else:
154                os.write(STDOUT_FILENO, data)
155
156        if STDIN_FILENO in rfds:
157            data = stdin_read(STDIN_FILENO)
158            if not data:
159                fds.remove(STDIN_FILENO)
160            else:
161                _writen(master_fd, data)
162
163def spawn(argv, master_read=_read, stdin_read=_read):
164    """Create a spawned process."""
165    if type(argv) == type(''):
166        argv = (argv,)
167    sys.audit('pty.spawn', argv)
168
169    pid, master_fd = fork()
170    if pid == CHILD:
171        os.execlp(argv[0], *argv)
172
173    try:
174        mode = tcgetattr(STDIN_FILENO)
175        setraw(STDIN_FILENO)
176        restore = True
177    except tty.error:    # This is the same as termios.error
178        restore = False
179
180    try:
181        _copy(master_fd, master_read, stdin_read)
182    finally:
183        if restore:
184            tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode)
185
186    close(master_fd)
187    return waitpid(pid, 0)[1]
188