1#!/usr/bin/env python3 2# 3# Copyright 2024 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18"""Classes to help coordinate running tasks and displaying progress.""" 19 20import subprocess 21import sys 22import threading 23 24from .errors import TaskError 25 26 27class Task: 28 """Defines a task to be run by the task_runner.""" 29 30 def __init__(self, cmd, fall_back_tasks=None): 31 self.cmd = cmd 32 self.fall_back_tasks = fall_back_tasks 33 34 35class TaskResult: 36 """Holds result and status code of a task.""" 37 38 def __init__(self, status_code, result=''): 39 self.status_code = status_code 40 self.result = result 41 42 43class TaskRunner: 44 """Runs a set of tasks and displays progress.""" 45 46 def __init__(self): 47 self.tasks = {} 48 self.task_queue = [] 49 50 self.running = False 51 52 # UI 53 self.quiet = False 54 self.output = '' 55 self.running_indicator_thread = None 56 self.running_indicator_chars = ['→'] 57 # self.running_indicator_chars = ['◢', '◣', '◤', '◥'] 58 self.running_indicator_index = 0 59 self.stop_event = threading.Event() 60 61 def add_task(self, name, function, *args, fall_back_tasks=None, **kwargs): 62 """Adds a task to the queue.""" 63 self.tasks[name] = { 64 'status': 'pending', 65 'function': function, 66 'output': '', 67 'args': args, 68 'kwargs': kwargs, 69 'fall_back_tasks': fall_back_tasks, 70 } 71 self.task_queue.append(name) 72 73 def start(self): 74 """Starts running all the tasks in the queue.""" 75 print('Running Plan:') 76 self.running = True 77 self._run_next_task() 78 79 def run_task(self, name): 80 """Run this task in the queue.""" 81 task = self.tasks[name] 82 self.render_output() 83 try: 84 for line in task['function'](*task['args'], **task['kwargs']): 85 if isinstance(line, TaskResult): 86 result = line 87 if result.status_code != 0: 88 raise TaskError(f'status_code: {result.status_code}') 89 else: 90 self.tasks[name]['output'] += line 91 self.tasks[name]['status'] = 'completed' 92 if self.running: 93 self._run_next_task() 94 except TaskError as e: 95 self.tasks[name]['status'] = 'failed' 96 self.tasks[name]['output'] += f'Error: {e}\n' 97 self.render_output() 98 99 fall_back_tasks = self.tasks[name].get('fall_back_tasks', []) 100 if fall_back_tasks: 101 self.task_queue = [] 102 for t in fall_back_tasks: 103 if isinstance(t, str): 104 self.add_shell_command_task([t]) 105 self._run_next_task() 106 else: 107 if self.running: 108 self.running = False 109 110 def _run_next_task(self): 111 """Runs the next task in the queue.""" 112 if self.task_queue and self.running: 113 name = self.task_queue.pop(0) 114 self.tasks[name]['status'] = 'running' 115 threading.Thread(target=self.run_task, args=(name,)).start() 116 elif self.running: 117 self.running = False 118 self.render_output() 119 120 if self.quiet: 121 return 122 123 print('') 124 print('Run Completed Successfully!') 125 print('') 126 127 def add_shell_command_task(self, command, fall_back_tasks=None): 128 """Adds a shell command to the task queue.""" 129 self.add_task( 130 command, run_shell_command, command, fall_back_tasks=fall_back_tasks 131 ) 132 133 def render_output(self): 134 """Prints the output of the tasks as well as a table showing the progres on the task queue.""" 135 if self.quiet: 136 return 137 138 # os.system('cls' if os.name == 'nt' else 'clear') 139 print(f'{self.output}', end='') 140 for name, command_data in self.tasks.items(): 141 print(f"{command_data['output']}", end='') 142 143 for name, command_data in self.tasks.items(): 144 status_icon = '.' 145 status_color = '\033[94m' # Blue 146 if command_data['status'] == 'completed': 147 status_icon = '✓' 148 status_color = '\033[32m' # Green 149 elif command_data['status'] == 'running': 150 status_icon = self.running_indicator_chars[self.running_indicator_index] 151 status_color = '\033[32m' # Green 152 elif command_data['status'] == 'failed': 153 status_icon = '✗' 154 status_color = '\033[91m' # Red 155 print(f'{status_color}{status_icon}\033[0m {name}\033[0m') 156 print('-' * 20) 157 158 159def run_shell_command(command, use_stdout=True): 160 """Run a shell command and yield output.""" 161 last_line = '' 162 163 if use_stdout: 164 with subprocess.Popen( 165 command, 166 shell=True, 167 stdout=sys.stdout, 168 stderr=sys.stderr, 169 text=True, 170 ) as process: 171 status_code = process.wait() 172 yield TaskResult(status_code=status_code) 173 else: 174 with subprocess.Popen( 175 command, 176 shell=True, 177 text=True, 178 ) as process: 179 status_code = process.wait() 180 for line in iter(process.stdout.readline, ''): 181 if line.strip() == last_line: 182 continue 183 last_line = line.strip() 184 yield line 185 process.stdout.flush() 186 process.stdout.close() 187 status_code = process.wait() 188 yield TaskResult(status_code=status_code) 189