• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1__all__ = 'create_subprocess_exec', 'create_subprocess_shell'
2
3import subprocess
4
5from . import events
6from . import protocols
7from . import streams
8from . import tasks
9from .log import logger
10
11
12PIPE = subprocess.PIPE
13STDOUT = subprocess.STDOUT
14DEVNULL = subprocess.DEVNULL
15
16
17class SubprocessStreamProtocol(streams.FlowControlMixin,
18                               protocols.SubprocessProtocol):
19    """Like StreamReaderProtocol, but for a subprocess."""
20
21    def __init__(self, limit, loop):
22        super().__init__(loop=loop)
23        self._limit = limit
24        self.stdin = self.stdout = self.stderr = None
25        self._transport = None
26        self._process_exited = False
27        self._pipe_fds = []
28        self._stdin_closed = self._loop.create_future()
29
30    def __repr__(self):
31        info = [self.__class__.__name__]
32        if self.stdin is not None:
33            info.append(f'stdin={self.stdin!r}')
34        if self.stdout is not None:
35            info.append(f'stdout={self.stdout!r}')
36        if self.stderr is not None:
37            info.append(f'stderr={self.stderr!r}')
38        return '<{}>'.format(' '.join(info))
39
40    def connection_made(self, transport):
41        self._transport = transport
42
43        stdout_transport = transport.get_pipe_transport(1)
44        if stdout_transport is not None:
45            self.stdout = streams.StreamReader(limit=self._limit,
46                                               loop=self._loop)
47            self.stdout.set_transport(stdout_transport)
48            self._pipe_fds.append(1)
49
50        stderr_transport = transport.get_pipe_transport(2)
51        if stderr_transport is not None:
52            self.stderr = streams.StreamReader(limit=self._limit,
53                                               loop=self._loop)
54            self.stderr.set_transport(stderr_transport)
55            self._pipe_fds.append(2)
56
57        stdin_transport = transport.get_pipe_transport(0)
58        if stdin_transport is not None:
59            self.stdin = streams.StreamWriter(stdin_transport,
60                                              protocol=self,
61                                              reader=None,
62                                              loop=self._loop)
63
64    def pipe_data_received(self, fd, data):
65        if fd == 1:
66            reader = self.stdout
67        elif fd == 2:
68            reader = self.stderr
69        else:
70            reader = None
71        if reader is not None:
72            reader.feed_data(data)
73
74    def pipe_connection_lost(self, fd, exc):
75        if fd == 0:
76            pipe = self.stdin
77            if pipe is not None:
78                pipe.close()
79            self.connection_lost(exc)
80            if exc is None:
81                self._stdin_closed.set_result(None)
82            else:
83                self._stdin_closed.set_exception(exc)
84            return
85        if fd == 1:
86            reader = self.stdout
87        elif fd == 2:
88            reader = self.stderr
89        else:
90            reader = None
91        if reader is not None:
92            if exc is None:
93                reader.feed_eof()
94            else:
95                reader.set_exception(exc)
96
97        if fd in self._pipe_fds:
98            self._pipe_fds.remove(fd)
99        self._maybe_close_transport()
100
101    def process_exited(self):
102        self._process_exited = True
103        self._maybe_close_transport()
104
105    def _maybe_close_transport(self):
106        if len(self._pipe_fds) == 0 and self._process_exited:
107            self._transport.close()
108            self._transport = None
109
110    def _get_close_waiter(self, stream):
111        if stream is self.stdin:
112            return self._stdin_closed
113
114
115class Process:
116    def __init__(self, transport, protocol, loop):
117        self._transport = transport
118        self._protocol = protocol
119        self._loop = loop
120        self.stdin = protocol.stdin
121        self.stdout = protocol.stdout
122        self.stderr = protocol.stderr
123        self.pid = transport.get_pid()
124
125    def __repr__(self):
126        return f'<{self.__class__.__name__} {self.pid}>'
127
128    @property
129    def returncode(self):
130        return self._transport.get_returncode()
131
132    async def wait(self):
133        """Wait until the process exit and return the process return code."""
134        return await self._transport._wait()
135
136    def send_signal(self, signal):
137        self._transport.send_signal(signal)
138
139    def terminate(self):
140        self._transport.terminate()
141
142    def kill(self):
143        self._transport.kill()
144
145    async def _feed_stdin(self, input):
146        debug = self._loop.get_debug()
147        self.stdin.write(input)
148        if debug:
149            logger.debug(
150                '%r communicate: feed stdin (%s bytes)', self, len(input))
151        try:
152            await self.stdin.drain()
153        except (BrokenPipeError, ConnectionResetError) as exc:
154            # communicate() ignores BrokenPipeError and ConnectionResetError
155            if debug:
156                logger.debug('%r communicate: stdin got %r', self, exc)
157
158        if debug:
159            logger.debug('%r communicate: close stdin', self)
160        self.stdin.close()
161
162    async def _noop(self):
163        return None
164
165    async def _read_stream(self, fd):
166        transport = self._transport.get_pipe_transport(fd)
167        if fd == 2:
168            stream = self.stderr
169        else:
170            assert fd == 1
171            stream = self.stdout
172        if self._loop.get_debug():
173            name = 'stdout' if fd == 1 else 'stderr'
174            logger.debug('%r communicate: read %s', self, name)
175        output = await stream.read()
176        if self._loop.get_debug():
177            name = 'stdout' if fd == 1 else 'stderr'
178            logger.debug('%r communicate: close %s', self, name)
179        transport.close()
180        return output
181
182    async def communicate(self, input=None):
183        if input is not None:
184            stdin = self._feed_stdin(input)
185        else:
186            stdin = self._noop()
187        if self.stdout is not None:
188            stdout = self._read_stream(1)
189        else:
190            stdout = self._noop()
191        if self.stderr is not None:
192            stderr = self._read_stream(2)
193        else:
194            stderr = self._noop()
195        stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr)
196        await self.wait()
197        return (stdout, stderr)
198
199
200async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
201                                  limit=streams._DEFAULT_LIMIT, **kwds):
202    loop = events.get_running_loop()
203    protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
204                                                        loop=loop)
205    transport, protocol = await loop.subprocess_shell(
206        protocol_factory,
207        cmd, stdin=stdin, stdout=stdout,
208        stderr=stderr, **kwds)
209    return Process(transport, protocol, loop)
210
211
212async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
213                                 stderr=None, limit=streams._DEFAULT_LIMIT,
214                                 **kwds):
215    loop = events.get_running_loop()
216    protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
217                                                        loop=loop)
218    transport, protocol = await loop.subprocess_exec(
219        protocol_factory,
220        program, *args,
221        stdin=stdin, stdout=stdout,
222        stderr=stderr, **kwds)
223    return Process(transport, protocol, loop)
224