1import collections 2import subprocess 3import warnings 4 5from . import protocols 6from . import transports 7from .log import logger 8 9 10class BaseSubprocessTransport(transports.SubprocessTransport): 11 12 def __init__(self, loop, protocol, args, shell, 13 stdin, stdout, stderr, bufsize, 14 waiter=None, extra=None, **kwargs): 15 super().__init__(extra) 16 self._closed = False 17 self._protocol = protocol 18 self._loop = loop 19 self._proc = None 20 self._pid = None 21 self._returncode = None 22 self._exit_waiters = [] 23 self._pending_calls = collections.deque() 24 self._pipes = {} 25 self._finished = False 26 27 if stdin == subprocess.PIPE: 28 self._pipes[0] = None 29 if stdout == subprocess.PIPE: 30 self._pipes[1] = None 31 if stderr == subprocess.PIPE: 32 self._pipes[2] = None 33 34 # Create the child process: set the _proc attribute 35 try: 36 self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, 37 stderr=stderr, bufsize=bufsize, **kwargs) 38 except: 39 self.close() 40 raise 41 42 self._pid = self._proc.pid 43 self._extra['subprocess'] = self._proc 44 45 if self._loop.get_debug(): 46 if isinstance(args, (bytes, str)): 47 program = args 48 else: 49 program = args[0] 50 logger.debug('process %r created: pid %s', 51 program, self._pid) 52 53 self._loop.create_task(self._connect_pipes(waiter)) 54 55 def __repr__(self): 56 info = [self.__class__.__name__] 57 if self._closed: 58 info.append('closed') 59 if self._pid is not None: 60 info.append(f'pid={self._pid}') 61 if self._returncode is not None: 62 info.append(f'returncode={self._returncode}') 63 elif self._pid is not None: 64 info.append('running') 65 else: 66 info.append('not started') 67 68 stdin = self._pipes.get(0) 69 if stdin is not None: 70 info.append(f'stdin={stdin.pipe}') 71 72 stdout = self._pipes.get(1) 73 stderr = self._pipes.get(2) 74 if stdout is not None and stderr is stdout: 75 info.append(f'stdout=stderr={stdout.pipe}') 76 else: 77 if stdout is not None: 78 info.append(f'stdout={stdout.pipe}') 79 if stderr is not None: 80 info.append(f'stderr={stderr.pipe}') 81 82 return '<{}>'.format(' '.join(info)) 83 84 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 85 raise NotImplementedError 86 87 def set_protocol(self, protocol): 88 self._protocol = protocol 89 90 def get_protocol(self): 91 return self._protocol 92 93 def is_closing(self): 94 return self._closed 95 96 def close(self): 97 if self._closed: 98 return 99 self._closed = True 100 101 for proto in self._pipes.values(): 102 if proto is None: 103 continue 104 proto.pipe.close() 105 106 if (self._proc is not None and 107 # has the child process finished? 108 self._returncode is None and 109 # the child process has finished, but the 110 # transport hasn't been notified yet? 111 self._proc.poll() is None): 112 113 if self._loop.get_debug(): 114 logger.warning('Close running child process: kill %r', self) 115 116 try: 117 self._proc.kill() 118 except ProcessLookupError: 119 pass 120 121 # Don't clear the _proc reference yet: _post_init() may still run 122 123 def __del__(self, _warn=warnings.warn): 124 if not self._closed: 125 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 126 self.close() 127 128 def get_pid(self): 129 return self._pid 130 131 def get_returncode(self): 132 return self._returncode 133 134 def get_pipe_transport(self, fd): 135 if fd in self._pipes: 136 return self._pipes[fd].pipe 137 else: 138 return None 139 140 def _check_proc(self): 141 if self._proc is None: 142 raise ProcessLookupError() 143 144 def send_signal(self, signal): 145 self._check_proc() 146 self._proc.send_signal(signal) 147 148 def terminate(self): 149 self._check_proc() 150 self._proc.terminate() 151 152 def kill(self): 153 self._check_proc() 154 self._proc.kill() 155 156 async def _connect_pipes(self, waiter): 157 try: 158 proc = self._proc 159 loop = self._loop 160 161 if proc.stdin is not None: 162 _, pipe = await loop.connect_write_pipe( 163 lambda: WriteSubprocessPipeProto(self, 0), 164 proc.stdin) 165 self._pipes[0] = pipe 166 167 if proc.stdout is not None: 168 _, pipe = await loop.connect_read_pipe( 169 lambda: ReadSubprocessPipeProto(self, 1), 170 proc.stdout) 171 self._pipes[1] = pipe 172 173 if proc.stderr is not None: 174 _, pipe = await loop.connect_read_pipe( 175 lambda: ReadSubprocessPipeProto(self, 2), 176 proc.stderr) 177 self._pipes[2] = pipe 178 179 assert self._pending_calls is not None 180 181 loop.call_soon(self._protocol.connection_made, self) 182 for callback, data in self._pending_calls: 183 loop.call_soon(callback, *data) 184 self._pending_calls = None 185 except (SystemExit, KeyboardInterrupt): 186 raise 187 except BaseException as exc: 188 if waiter is not None and not waiter.cancelled(): 189 waiter.set_exception(exc) 190 else: 191 if waiter is not None and not waiter.cancelled(): 192 waiter.set_result(None) 193 194 def _call(self, cb, *data): 195 if self._pending_calls is not None: 196 self._pending_calls.append((cb, data)) 197 else: 198 self._loop.call_soon(cb, *data) 199 200 def _pipe_connection_lost(self, fd, exc): 201 self._call(self._protocol.pipe_connection_lost, fd, exc) 202 self._try_finish() 203 204 def _pipe_data_received(self, fd, data): 205 self._call(self._protocol.pipe_data_received, fd, data) 206 207 def _process_exited(self, returncode): 208 assert returncode is not None, returncode 209 assert self._returncode is None, self._returncode 210 if self._loop.get_debug(): 211 logger.info('%r exited with return code %r', self, returncode) 212 self._returncode = returncode 213 if self._proc.returncode is None: 214 # asyncio uses a child watcher: copy the status into the Popen 215 # object. On Python 3.6, it is required to avoid a ResourceWarning. 216 self._proc.returncode = returncode 217 self._call(self._protocol.process_exited) 218 self._try_finish() 219 220 # wake up futures waiting for wait() 221 for waiter in self._exit_waiters: 222 if not waiter.cancelled(): 223 waiter.set_result(returncode) 224 self._exit_waiters = None 225 226 async def _wait(self): 227 """Wait until the process exit and return the process return code. 228 229 This method is a coroutine.""" 230 if self._returncode is not None: 231 return self._returncode 232 233 waiter = self._loop.create_future() 234 self._exit_waiters.append(waiter) 235 return await waiter 236 237 def _try_finish(self): 238 assert not self._finished 239 if self._returncode is None: 240 return 241 if all(p is not None and p.disconnected 242 for p in self._pipes.values()): 243 self._finished = True 244 self._call(self._call_connection_lost, None) 245 246 def _call_connection_lost(self, exc): 247 try: 248 self._protocol.connection_lost(exc) 249 finally: 250 self._loop = None 251 self._proc = None 252 self._protocol = None 253 254 255class WriteSubprocessPipeProto(protocols.BaseProtocol): 256 257 def __init__(self, proc, fd): 258 self.proc = proc 259 self.fd = fd 260 self.pipe = None 261 self.disconnected = False 262 263 def connection_made(self, transport): 264 self.pipe = transport 265 266 def __repr__(self): 267 return f'<{self.__class__.__name__} fd={self.fd} pipe={self.pipe!r}>' 268 269 def connection_lost(self, exc): 270 self.disconnected = True 271 self.proc._pipe_connection_lost(self.fd, exc) 272 self.proc = None 273 274 def pause_writing(self): 275 self.proc._protocol.pause_writing() 276 277 def resume_writing(self): 278 self.proc._protocol.resume_writing() 279 280 281class ReadSubprocessPipeProto(WriteSubprocessPipeProto, 282 protocols.Protocol): 283 284 def data_received(self, data): 285 self.proc._pipe_data_received(self.fd, data) 286