• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright 2024 - The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Classes to help coordinate running tasks and displaying progress."""
19
20import subprocess
21import sys
22import threading
23
24from .errors import TaskError
25
26
27class Task:
28  """Defines a task to be run by the task_runner."""
29
30  def __init__(self, cmd, fall_back_tasks=None):
31    self.cmd = cmd
32    self.fall_back_tasks = fall_back_tasks
33
34
35class TaskResult:
36  """Holds result and status code of a task."""
37
38  def __init__(self, status_code, result=''):
39    self.status_code = status_code
40    self.result = result
41
42
43class TaskRunner:
44  """Runs a set of tasks and displays progress."""
45
46  def __init__(self):
47    self.tasks = {}
48    self.task_queue = []
49
50    self.running = False
51
52    # UI
53    self.quiet = False
54    self.output = ''
55    self.running_indicator_thread = None
56    self.running_indicator_chars = ['→']
57    # self.running_indicator_chars = ['◢', '◣', '◤', '◥']
58    self.running_indicator_index = 0
59    self.stop_event = threading.Event()
60
61  def add_task(self, name, function, *args, fall_back_tasks=None, **kwargs):
62    """Adds a task to the queue."""
63    self.tasks[name] = {
64        'status': 'pending',
65        'function': function,
66        'output': '',
67        'args': args,
68        'kwargs': kwargs,
69        'fall_back_tasks': fall_back_tasks,
70    }
71    self.task_queue.append(name)
72
73  def start(self):
74    """Starts running all the tasks in the queue."""
75    print('Running Plan:')
76    self.running = True
77    self._run_next_task()
78
79  def run_task(self, name):
80    """Run this task in the queue."""
81    task = self.tasks[name]
82    self.render_output()
83    try:
84      for line in task['function'](*task['args'], **task['kwargs']):
85        if isinstance(line, TaskResult):
86          result = line
87          if result.status_code != 0:
88            raise TaskError(f'status_code: {result.status_code}')
89        else:
90          self.tasks[name]['output'] += line
91      self.tasks[name]['status'] = 'completed'
92      if self.running:
93        self._run_next_task()
94    except TaskError as e:
95      self.tasks[name]['status'] = 'failed'
96      self.tasks[name]['output'] += f'Error: {e}\n'
97      self.render_output()
98
99      fall_back_tasks = self.tasks[name].get('fall_back_tasks', [])
100      if fall_back_tasks:
101        self.task_queue = []
102        for t in fall_back_tasks:
103          if isinstance(t, str):
104            self.add_shell_command_task([t])
105        self._run_next_task()
106      else:
107        if self.running:
108          self.running = False
109
110  def _run_next_task(self):
111    """Runs the next task in the queue."""
112    if self.task_queue and self.running:
113      name = self.task_queue.pop(0)
114      self.tasks[name]['status'] = 'running'
115      threading.Thread(target=self.run_task, args=(name,)).start()
116    elif self.running:
117      self.running = False
118      self.render_output()
119
120      if self.quiet:
121        return
122
123      print('')
124      print('Run Completed Successfully!')
125      print('')
126
127  def add_shell_command_task(self, command, fall_back_tasks=None):
128    """Adds a shell command to the task queue."""
129    self.add_task(
130        command, run_shell_command, command, fall_back_tasks=fall_back_tasks
131    )
132
133  def render_output(self):
134    """Prints the output of the tasks as well as a table showing the progres on the task queue."""
135    if self.quiet:
136      return
137
138    # os.system('cls' if os.name == 'nt' else 'clear')
139    print(f'{self.output}', end='')
140    for name, command_data in self.tasks.items():
141      print(f"{command_data['output']}", end='')
142
143    for name, command_data in self.tasks.items():
144      status_icon = '.'
145      status_color = '\033[94m'  # Blue
146      if command_data['status'] == 'completed':
147        status_icon = '✓'
148        status_color = '\033[32m'  # Green
149      elif command_data['status'] == 'running':
150        status_icon = self.running_indicator_chars[self.running_indicator_index]
151        status_color = '\033[32m'  # Green
152      elif command_data['status'] == 'failed':
153        status_icon = '✗'
154        status_color = '\033[91m'  # Red
155      print(f'{status_color}{status_icon}\033[0m {name}\033[0m')
156    print('-' * 20)
157
158
159def run_shell_command(command, use_stdout=True):
160  """Run a shell command and yield output."""
161  last_line = ''
162
163  if use_stdout:
164    with subprocess.Popen(
165        command,
166        shell=True,
167        stdout=sys.stdout,
168        stderr=sys.stderr,
169        text=True,
170    ) as process:
171      status_code = process.wait()
172      yield TaskResult(status_code=status_code)
173  else:
174    with subprocess.Popen(
175        command,
176        shell=True,
177        text=True,
178    ) as process:
179      status_code = process.wait()
180      for line in iter(process.stdout.readline, ''):
181        if line.strip() == last_line:
182          continue
183        last_line = line.strip()
184        yield line
185      process.stdout.flush()
186      process.stdout.close()
187    status_code = process.wait()
188    yield TaskResult(status_code=status_code)
189