#!/usr/bin/env python3 # # Copyright 2019, The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Helper util libraries for command line operations.""" import asyncio import sys import time from typing import Tuple, Optional, List import lib.print_utils as print_utils TIMEOUT = 50 SIMULATE = False def run_command_nofail(cmd: List[str], **kwargs) -> None: """Runs cmd list with default timeout. Throws exception if the execution fails. """ my_kwargs = {"timeout": TIMEOUT, "shell": False, "simulate": False} my_kwargs.update(kwargs) passed, out = execute_arbitrary_command(cmd, **my_kwargs) if not passed: raise RuntimeError( "Failed to execute %s (kwargs=%s), output=%s" % (cmd, kwargs, out)) def run_adb_shell_command(cmd: str) -> Tuple[bool, str]: """Runs command using adb shell. Returns: A tuple of running status (True=succeeded, False=failed or timed out) and std output (string contents of stdout with trailing whitespace removed). """ return run_shell_command('adb shell "{}"'.format(cmd)) def run_shell_func(script_path: str, func: str, args: List[str]) -> Tuple[bool, str]: """Runs shell function with default timeout. Returns: A tuple of running status (True=succeeded, False=failed or timed out) and std output (string contents of stdout with trailing whitespace removed) . """ if args: cmd = 'bash -c "source {script_path}; {func} {args}"'.format( script_path=script_path, func=func, args=' '.join("'{}'".format(arg) for arg in args)) else: cmd = 'bash -c "source {script_path}; {func}"'.format( script_path=script_path, func=func) print_utils.debug_print(cmd) return run_shell_command(cmd) def run_shell_command(cmd: str) -> Tuple[bool, str]: """Runs shell command with default timeout. Returns: A tuple of running status (True=succeeded, False=failed or timed out) and std output (string contents of stdout with trailing whitespace removed) . """ return execute_arbitrary_command([cmd], TIMEOUT, shell=True, simulate=SIMULATE) def execute_arbitrary_command(cmd: List[str], timeout: int, shell: bool, simulate: bool) -> Tuple[bool, str]: """Run arbitrary shell command with default timeout. Mostly copy from frameworks/base/startop/scripts/app_startup/app_startup_runner.py. Args: cmd: list of cmd strings. timeout: the time limit of running cmd. shell: indicate if the cmd is a shell command. simulate: if it's true, do not run the command and assume the running is successful. Returns: A tuple of running status (True=succeeded, False=failed or timed out) and std output (string contents of stdout with trailing whitespace removed) . """ if simulate: print(cmd) return True, '' print_utils.debug_print('[EXECUTE]', cmd) # block until either command finishes or the timeout occurs. loop = asyncio.get_event_loop() (return_code, script_output) = loop.run_until_complete( _run_command(*cmd, shell=shell, timeout=timeout)) script_output = script_output.decode() # convert bytes to str passed = (return_code == 0) print_utils.debug_print('[$?]', return_code) if not passed: print('[FAILED, code:%s]' % (return_code), script_output, file=sys.stderr) return passed, script_output.rstrip() async def _run_command(*args: List[str], shell: bool = False, timeout: Optional[int] = None) -> Tuple[int, bytes]: if shell: process = await asyncio.create_subprocess_shell( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT) else: process = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT) script_output = b'' print_utils.debug_print('[PID]', process.pid) timeout_remaining = timeout time_started = time.time() # read line (sequence of bytes ending with b'\n') asynchronously while True: try: line = await asyncio.wait_for(process.stdout.readline(), timeout_remaining) print_utils.debug_print('[STDOUT]', line) script_output += line if timeout_remaining: time_elapsed = time.time() - time_started timeout_remaining = timeout - time_elapsed except asyncio.TimeoutError: print_utils.debug_print('[TIMEDOUT] Process ', process.pid) print_utils.debug_print('[TIMEDOUT] Sending SIGTERM.') process.terminate() # 5 second timeout for process to handle SIGTERM nicely. try: (remaining_stdout, remaining_stderr) = await asyncio.wait_for(process.communicate(), 5) script_output += remaining_stdout except asyncio.TimeoutError: print_utils.debug_print('[TIMEDOUT] Sending SIGKILL.') process.kill() # 5 second timeout to finish with SIGKILL. try: (remaining_stdout, remaining_stderr) = await asyncio.wait_for(process.communicate(), 5) script_output += remaining_stdout except asyncio.TimeoutError: # give up, this will leave a zombie process. print_utils.debug_print('[TIMEDOUT] SIGKILL failed for process ', process.pid) time.sleep(100) return -1, script_output else: if not line: # EOF break code = await process.wait() # wait for child process to exit return code, script_output