• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1__all__ = ['create_subprocess_exec', 'create_subprocess_shell']
2
3import subprocess
4
5from . import events
6from . import protocols
7from . import streams
8from . import tasks
9from .coroutines import coroutine
10from .log import logger
11
12
13PIPE = subprocess.PIPE
14STDOUT = subprocess.STDOUT
15DEVNULL = subprocess.DEVNULL
16
17
18class SubprocessStreamProtocol(streams.FlowControlMixin,
19                               protocols.SubprocessProtocol):
20    """Like StreamReaderProtocol, but for a subprocess."""
21
22    def __init__(self, limit, loop):
23        super().__init__(loop=loop)
24        self._limit = limit
25        self.stdin = self.stdout = self.stderr = None
26        self._transport = None
27        self._process_exited = False
28        self._pipe_fds = []
29
30    def __repr__(self):
31        info = [self.__class__.__name__]
32        if self.stdin is not None:
33            info.append('stdin=%r' % self.stdin)
34        if self.stdout is not None:
35            info.append('stdout=%r' % self.stdout)
36        if self.stderr is not None:
37            info.append('stderr=%r' % self.stderr)
38        return '<%s>' % ' '.join(info)
39
40    def connection_made(self, transport):
41        self._transport = transport
42
43        stdout_transport = transport.get_pipe_transport(1)
44        if stdout_transport is not None:
45            self.stdout = streams.StreamReader(limit=self._limit,
46                                               loop=self._loop)
47            self.stdout.set_transport(stdout_transport)
48            self._pipe_fds.append(1)
49
50        stderr_transport = transport.get_pipe_transport(2)
51        if stderr_transport is not None:
52            self.stderr = streams.StreamReader(limit=self._limit,
53                                               loop=self._loop)
54            self.stderr.set_transport(stderr_transport)
55            self._pipe_fds.append(2)
56
57        stdin_transport = transport.get_pipe_transport(0)
58        if stdin_transport is not None:
59            self.stdin = streams.StreamWriter(stdin_transport,
60                                              protocol=self,
61                                              reader=None,
62                                              loop=self._loop)
63
64    def pipe_data_received(self, fd, data):
65        if fd == 1:
66            reader = self.stdout
67        elif fd == 2:
68            reader = self.stderr
69        else:
70            reader = None
71        if reader is not None:
72            reader.feed_data(data)
73
74    def pipe_connection_lost(self, fd, exc):
75        if fd == 0:
76            pipe = self.stdin
77            if pipe is not None:
78                pipe.close()
79            self.connection_lost(exc)
80            return
81        if fd == 1:
82            reader = self.stdout
83        elif fd == 2:
84            reader = self.stderr
85        else:
86            reader = None
87        if reader != None:
88            if exc is None:
89                reader.feed_eof()
90            else:
91                reader.set_exception(exc)
92
93        if fd in self._pipe_fds:
94            self._pipe_fds.remove(fd)
95        self._maybe_close_transport()
96
97    def process_exited(self):
98        self._process_exited = True
99        self._maybe_close_transport()
100
101    def _maybe_close_transport(self):
102        if len(self._pipe_fds) == 0 and self._process_exited:
103            self._transport.close()
104            self._transport = None
105
106
107class Process:
108    def __init__(self, transport, protocol, loop):
109        self._transport = transport
110        self._protocol = protocol
111        self._loop = loop
112        self.stdin = protocol.stdin
113        self.stdout = protocol.stdout
114        self.stderr = protocol.stderr
115        self.pid = transport.get_pid()
116
117    def __repr__(self):
118        return '<%s %s>' % (self.__class__.__name__, self.pid)
119
120    @property
121    def returncode(self):
122        return self._transport.get_returncode()
123
124    @coroutine
125    def wait(self):
126        """Wait until the process exit and return the process return code.
127
128        This method is a coroutine."""
129        return (yield from self._transport._wait())
130
131    def send_signal(self, signal):
132        self._transport.send_signal(signal)
133
134    def terminate(self):
135        self._transport.terminate()
136
137    def kill(self):
138        self._transport.kill()
139
140    @coroutine
141    def _feed_stdin(self, input):
142        debug = self._loop.get_debug()
143        self.stdin.write(input)
144        if debug:
145            logger.debug('%r communicate: feed stdin (%s bytes)',
146                        self, len(input))
147        try:
148            yield from self.stdin.drain()
149        except (BrokenPipeError, ConnectionResetError) as exc:
150            # communicate() ignores BrokenPipeError and ConnectionResetError
151            if debug:
152                logger.debug('%r communicate: stdin got %r', self, exc)
153
154        if debug:
155            logger.debug('%r communicate: close stdin', self)
156        self.stdin.close()
157
158    @coroutine
159    def _noop(self):
160        return None
161
162    @coroutine
163    def _read_stream(self, fd):
164        transport = self._transport.get_pipe_transport(fd)
165        if fd == 2:
166            stream = self.stderr
167        else:
168            assert fd == 1
169            stream = self.stdout
170        if self._loop.get_debug():
171            name = 'stdout' if fd == 1 else 'stderr'
172            logger.debug('%r communicate: read %s', self, name)
173        output = yield from stream.read()
174        if self._loop.get_debug():
175            name = 'stdout' if fd == 1 else 'stderr'
176            logger.debug('%r communicate: close %s', self, name)
177        transport.close()
178        return output
179
180    @coroutine
181    def communicate(self, input=None):
182        if input is not None:
183            stdin = self._feed_stdin(input)
184        else:
185            stdin = self._noop()
186        if self.stdout is not None:
187            stdout = self._read_stream(1)
188        else:
189            stdout = self._noop()
190        if self.stderr is not None:
191            stderr = self._read_stream(2)
192        else:
193            stderr = self._noop()
194        stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
195                                                        loop=self._loop)
196        yield from self.wait()
197        return (stdout, stderr)
198
199
200@coroutine
201def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
202                            loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
203    if loop is None:
204        loop = events.get_event_loop()
205    protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
206                                                        loop=loop)
207    transport, protocol = yield from loop.subprocess_shell(
208                                            protocol_factory,
209                                            cmd, stdin=stdin, stdout=stdout,
210                                            stderr=stderr, **kwds)
211    return Process(transport, protocol, loop)
212
213@coroutine
214def create_subprocess_exec(program, *args, stdin=None, stdout=None,
215                           stderr=None, loop=None,
216                           limit=streams._DEFAULT_LIMIT, **kwds):
217    if loop is None:
218        loop = events.get_event_loop()
219    protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
220                                                        loop=loop)
221    transport, protocol = yield from loop.subprocess_exec(
222                                            protocol_factory,
223                                            program, *args,
224                                            stdin=stdin, stdout=stdout,
225                                            stderr=stderr, **kwds)
226    return Process(transport, protocol, loop)
227