1# Copyright 2023 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Tests for pw_cli.process.""" 15 16import unittest 17import sys 18import textwrap 19 20from pw_cli import process 21 22import psutil # type: ignore 23 24 25FAST_TIMEOUT_SECONDS = 0.1 26KILL_SIGNALS = set({-9, 137}) 27PYTHON = sys.executable 28 29 30class RunTest(unittest.TestCase): 31 """Tests for process.run.""" 32 33 def test_returns_output(self) -> None: 34 echo_str = 'foobar' 35 print_str = f'print("{echo_str}")' 36 result = process.run(PYTHON, '-c', print_str) 37 self.assertEqual(result.output, b'foobar\n') 38 39 def test_timeout_kills_process(self) -> None: 40 sleep_100_seconds = 'import time; time.sleep(100)' 41 result = process.run( 42 PYTHON, '-c', sleep_100_seconds, timeout=FAST_TIMEOUT_SECONDS 43 ) 44 self.assertIn(result.returncode, KILL_SIGNALS) 45 46 def test_timeout_kills_subprocess(self) -> None: 47 # Spawn a subprocess which waits for 100 seconds, print its pid, 48 # then wait for 100 seconds. 49 sleep_in_subprocess = textwrap.dedent( 50 f""" 51 import subprocess 52 import time 53 54 child = subprocess.Popen( 55 ['{PYTHON}', '-c', 'import time; print("booh"); time.sleep(100)'] 56 ) 57 print(child.pid, flush=True) 58 time.sleep(100) 59 """ 60 ) 61 result = process.run( 62 PYTHON, '-c', sleep_in_subprocess, timeout=FAST_TIMEOUT_SECONDS 63 ) 64 self.assertIn(result.returncode, KILL_SIGNALS) 65 # THe first line of the output is the PID of the child sleep process. 66 child_pid_str, sep, remainder = result.output.partition(b'\n') 67 del sep, remainder 68 child_pid = int(child_pid_str) 69 self.assertFalse(psutil.pid_exists(child_pid)) 70 71 72if __name__ == '__main__': 73 unittest.main() 74