1__all__ = 'create_subprocess_exec', 'create_subprocess_shell' 2 3import subprocess 4 5from . import events 6from . import protocols 7from . import streams 8from . import tasks 9from .log import logger 10 11 12PIPE = subprocess.PIPE 13STDOUT = subprocess.STDOUT 14DEVNULL = subprocess.DEVNULL 15 16 17class SubprocessStreamProtocol(streams.FlowControlMixin, 18 protocols.SubprocessProtocol): 19 """Like StreamReaderProtocol, but for a subprocess.""" 20 21 def __init__(self, limit, loop): 22 super().__init__(loop=loop) 23 self._limit = limit 24 self.stdin = self.stdout = self.stderr = None 25 self._transport = None 26 self._process_exited = False 27 self._pipe_fds = [] 28 self._stdin_closed = self._loop.create_future() 29 30 def __repr__(self): 31 info = [self.__class__.__name__] 32 if self.stdin is not None: 33 info.append(f'stdin={self.stdin!r}') 34 if self.stdout is not None: 35 info.append(f'stdout={self.stdout!r}') 36 if self.stderr is not None: 37 info.append(f'stderr={self.stderr!r}') 38 return '<{}>'.format(' '.join(info)) 39 40 def connection_made(self, transport): 41 self._transport = transport 42 43 stdout_transport = transport.get_pipe_transport(1) 44 if stdout_transport is not None: 45 self.stdout = streams.StreamReader(limit=self._limit, 46 loop=self._loop) 47 self.stdout.set_transport(stdout_transport) 48 self._pipe_fds.append(1) 49 50 stderr_transport = transport.get_pipe_transport(2) 51 if stderr_transport is not None: 52 self.stderr = streams.StreamReader(limit=self._limit, 53 loop=self._loop) 54 self.stderr.set_transport(stderr_transport) 55 self._pipe_fds.append(2) 56 57 stdin_transport = transport.get_pipe_transport(0) 58 if stdin_transport is not None: 59 self.stdin = streams.StreamWriter(stdin_transport, 60 protocol=self, 61 reader=None, 62 loop=self._loop) 63 64 def pipe_data_received(self, fd, data): 65 if fd == 1: 66 reader = self.stdout 67 elif fd == 2: 68 reader = self.stderr 69 else: 70 reader = None 71 if reader is not None: 72 reader.feed_data(data) 73 74 def pipe_connection_lost(self, fd, exc): 75 if fd == 0: 76 pipe = self.stdin 77 if pipe is not None: 78 pipe.close() 79 self.connection_lost(exc) 80 if exc is None: 81 self._stdin_closed.set_result(None) 82 else: 83 self._stdin_closed.set_exception(exc) 84 return 85 if fd == 1: 86 reader = self.stdout 87 elif fd == 2: 88 reader = self.stderr 89 else: 90 reader = None 91 if reader is not None: 92 if exc is None: 93 reader.feed_eof() 94 else: 95 reader.set_exception(exc) 96 97 if fd in self._pipe_fds: 98 self._pipe_fds.remove(fd) 99 self._maybe_close_transport() 100 101 def process_exited(self): 102 self._process_exited = True 103 self._maybe_close_transport() 104 105 def _maybe_close_transport(self): 106 if len(self._pipe_fds) == 0 and self._process_exited: 107 self._transport.close() 108 self._transport = None 109 110 def _get_close_waiter(self, stream): 111 if stream is self.stdin: 112 return self._stdin_closed 113 114 115class Process: 116 def __init__(self, transport, protocol, loop): 117 self._transport = transport 118 self._protocol = protocol 119 self._loop = loop 120 self.stdin = protocol.stdin 121 self.stdout = protocol.stdout 122 self.stderr = protocol.stderr 123 self.pid = transport.get_pid() 124 125 def __repr__(self): 126 return f'<{self.__class__.__name__} {self.pid}>' 127 128 @property 129 def returncode(self): 130 return self._transport.get_returncode() 131 132 async def wait(self): 133 """Wait until the process exit and return the process return code.""" 134 return await self._transport._wait() 135 136 def send_signal(self, signal): 137 self._transport.send_signal(signal) 138 139 def terminate(self): 140 self._transport.terminate() 141 142 def kill(self): 143 self._transport.kill() 144 145 async def _feed_stdin(self, input): 146 debug = self._loop.get_debug() 147 self.stdin.write(input) 148 if debug: 149 logger.debug( 150 '%r communicate: feed stdin (%s bytes)', self, len(input)) 151 try: 152 await self.stdin.drain() 153 except (BrokenPipeError, ConnectionResetError) as exc: 154 # communicate() ignores BrokenPipeError and ConnectionResetError 155 if debug: 156 logger.debug('%r communicate: stdin got %r', self, exc) 157 158 if debug: 159 logger.debug('%r communicate: close stdin', self) 160 self.stdin.close() 161 162 async def _noop(self): 163 return None 164 165 async def _read_stream(self, fd): 166 transport = self._transport.get_pipe_transport(fd) 167 if fd == 2: 168 stream = self.stderr 169 else: 170 assert fd == 1 171 stream = self.stdout 172 if self._loop.get_debug(): 173 name = 'stdout' if fd == 1 else 'stderr' 174 logger.debug('%r communicate: read %s', self, name) 175 output = await stream.read() 176 if self._loop.get_debug(): 177 name = 'stdout' if fd == 1 else 'stderr' 178 logger.debug('%r communicate: close %s', self, name) 179 transport.close() 180 return output 181 182 async def communicate(self, input=None): 183 if input is not None: 184 stdin = self._feed_stdin(input) 185 else: 186 stdin = self._noop() 187 if self.stdout is not None: 188 stdout = self._read_stream(1) 189 else: 190 stdout = self._noop() 191 if self.stderr is not None: 192 stderr = self._read_stream(2) 193 else: 194 stderr = self._noop() 195 stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr) 196 await self.wait() 197 return (stdout, stderr) 198 199 200async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, 201 limit=streams._DEFAULT_LIMIT, **kwds): 202 loop = events.get_running_loop() 203 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, 204 loop=loop) 205 transport, protocol = await loop.subprocess_shell( 206 protocol_factory, 207 cmd, stdin=stdin, stdout=stdout, 208 stderr=stderr, **kwds) 209 return Process(transport, protocol, loop) 210 211 212async def create_subprocess_exec(program, *args, stdin=None, stdout=None, 213 stderr=None, limit=streams._DEFAULT_LIMIT, 214 **kwds): 215 loop = events.get_running_loop() 216 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, 217 loop=loop) 218 transport, protocol = await loop.subprocess_exec( 219 protocol_factory, 220 program, *args, 221 stdin=stdin, stdout=stdout, 222 stderr=stderr, **kwds) 223 return Process(transport, protocol, loop) 224