1#!/usr/bin/env python 2# Copyright 2022 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15""" Prompt toolkit application for pw watch. """ 16 17import asyncio 18import logging 19from pathlib import Path 20import re 21import sys 22from typing import List, NoReturn, Optional 23 24from prompt_toolkit.application import Application 25from prompt_toolkit.clipboard.pyperclip import PyperclipClipboard 26from prompt_toolkit.filters import Condition 27from prompt_toolkit.history import ( 28 FileHistory, 29 History, 30 ThreadedHistory, 31) 32from prompt_toolkit.key_binding import ( 33 KeyBindings, 34 KeyBindingsBase, 35 merge_key_bindings, 36) 37from prompt_toolkit.layout import ( 38 Dimension, 39 DynamicContainer, 40 Float, 41 FloatContainer, 42 FormattedTextControl, 43 HSplit, 44 Layout, 45 Window, 46) 47from prompt_toolkit.layout.controls import BufferControl 48from prompt_toolkit.styles import DynamicStyle, merge_styles, Style 49from prompt_toolkit.formatted_text import StyleAndTextTuples 50 51from pw_console.console_app import get_default_colordepth 52from pw_console.console_prefs import ConsolePrefs 53from pw_console.get_pw_console_app import PW_CONSOLE_APP_CONTEXTVAR 54from pw_console.log_pane import LogPane 55from pw_console.plugin_mixin import PluginMixin 56from pw_console.quit_dialog import QuitDialog 57import pw_console.style 58import pw_console.widgets.border 59from pw_console.window_manager import WindowManager 60 61_NINJA_LOG = logging.getLogger('pw_watch_ninja_output') 62_LOG = logging.getLogger('pw_watch') 63 64 65class WatchWindowManager(WindowManager): 66 def update_root_container_body(self): 67 self.application.window_manager_container = ( 68 self.create_root_container()) 69 70 71class WatchApp(PluginMixin): 72 """Pigweed Watch main window application.""" 73 # pylint: disable=too-many-instance-attributes 74 75 NINJA_FAILURE_TEXT = '\033[31mFAILED: ' 76 77 NINJA_BUILD_STEP = re.compile( 78 r"^\[(?P<step>[0-9]+)/(?P<total_steps>[0-9]+)\] (?P<action>.*)$") 79 80 def __init__(self, 81 event_handler, 82 debug_logging: bool = False, 83 log_file_name: Optional[str] = None): 84 85 self.event_handler = event_handler 86 87 self.external_logfile: Optional[Path] = (Path(log_file_name) 88 if log_file_name else None) 89 self.color_depth = get_default_colordepth() 90 91 # Necessary for some of pw_console's window manager features to work 92 # such as mouse drag resizing. 93 PW_CONSOLE_APP_CONTEXTVAR.set(self) # type: ignore 94 95 self.prefs = ConsolePrefs() 96 97 self.quit_dialog = QuitDialog(self, self.exit) # type: ignore 98 99 self.search_history_filename = self.prefs.search_history 100 # History instance for search toolbars. 101 self.search_history: History = ThreadedHistory( 102 FileHistory(str(self.search_history_filename))) 103 104 self.window_manager = WatchWindowManager(self) 105 106 pw_console.python_logging.setup_python_logging() 107 108 self._build_error_count = 0 109 self._errors_in_output = False 110 111 self.ninja_log_pane = LogPane(application=self, 112 pane_title='Pigweed Watch') 113 self.ninja_log_pane.add_log_handler(_NINJA_LOG, level_name='INFO') 114 self.ninja_log_pane.add_log_handler( 115 _LOG, level_name=('DEBUG' if debug_logging else 'INFO')) 116 # Set python log format to just the message itself. 117 self.ninja_log_pane.log_view.log_store.formatter = logging.Formatter( 118 '%(message)s') 119 self.ninja_log_pane.table_view = False 120 # Enable line wrapping 121 self.ninja_log_pane.toggle_wrap_lines() 122 # Blank right side toolbar text 123 self.ninja_log_pane._pane_subtitle = ' ' 124 self.ninja_log_view = self.ninja_log_pane.log_view 125 126 # Make tab and shift-tab search for next and previous error 127 next_error_bindings = KeyBindings() 128 129 @next_error_bindings.add('s-tab') 130 def _previous_error(_event): 131 self.jump_to_error(backwards=True) 132 133 @next_error_bindings.add('tab') 134 def _next_error(_event): 135 self.jump_to_error() 136 137 existing_log_bindings: Optional[KeyBindingsBase] = ( 138 self.ninja_log_pane.log_content_control.key_bindings) 139 140 key_binding_list: List[KeyBindingsBase] = [] 141 if existing_log_bindings: 142 key_binding_list.append(existing_log_bindings) 143 key_binding_list.append(next_error_bindings) 144 self.ninja_log_pane.log_content_control.key_bindings = ( 145 merge_key_bindings(key_binding_list)) 146 147 self.window_manager.add_pane(self.ninja_log_pane) 148 149 self.window_manager_container = ( 150 self.window_manager.create_root_container()) 151 152 self.status_bar_border_style = 'class:command-runner-border' 153 154 self.root_container = FloatContainer( 155 HSplit([ 156 pw_console.widgets.border.create_border( 157 HSplit([ 158 # The top toolbar. 159 Window( 160 content=FormattedTextControl( 161 self.get_statusbar_text), 162 height=Dimension.exact(1), 163 style='class:toolbar_inactive', 164 ), 165 # Result Toolbar. 166 Window( 167 content=FormattedTextControl( 168 self.get_resultbar_text), 169 height=lambda: len(self.event_handler. 170 build_commands), 171 style='class:toolbar_inactive', 172 ), 173 ]), 174 border_style=lambda: self.status_bar_border_style, 175 base_style='class:toolbar_inactive', 176 left_margin_columns=1, 177 right_margin_columns=1, 178 ), 179 # The main content. 180 DynamicContainer(lambda: self.window_manager_container), 181 ]), 182 floats=[ 183 Float( 184 content=self.quit_dialog, 185 top=2, 186 left=2, 187 ), 188 ], 189 ) 190 191 key_bindings = KeyBindings() 192 193 @key_bindings.add('enter', filter=self.input_box_not_focused()) 194 def _run_build(_event): 195 "Rebuild." 196 self.run_build() 197 198 register = self.prefs.register_keybinding 199 200 @register('global.exit-no-confirmation', key_bindings) 201 def _quit_no_confirm(_event): 202 """Quit without confirmation.""" 203 _LOG.info('Got quit signal; exiting...') 204 self.exit(0) 205 206 @register('global.exit-with-confirmation', key_bindings) 207 def _quit_with_confirm(_event): 208 """Quit with confirmation dialog.""" 209 self.quit_dialog.open_dialog() 210 211 self.key_bindings = merge_key_bindings([ 212 self.window_manager.key_bindings, 213 key_bindings, 214 ]) 215 216 self.current_theme = pw_console.style.generate_styles( 217 self.prefs.ui_theme) 218 self.current_theme = merge_styles([ 219 self.current_theme, 220 Style.from_dict({'search': 'bg:ansired ansiblack'}), 221 ]) 222 223 self.layout = Layout(self.root_container, 224 focused_element=self.ninja_log_pane) 225 226 self.application: Application = Application( 227 layout=self.layout, 228 key_bindings=self.key_bindings, 229 mouse_support=True, 230 color_depth=self.color_depth, 231 clipboard=PyperclipClipboard(), 232 style=DynamicStyle(lambda: merge_styles([ 233 self.current_theme, 234 ])), 235 full_screen=True, 236 ) 237 238 self.plugin_init( 239 plugin_callback=self.check_build_status, 240 plugin_callback_frequency=0.5, 241 plugin_logger_name='pw_watch_stdout_checker', 242 ) 243 244 def jump_to_error(self, backwards: bool = False) -> None: 245 if not self.ninja_log_pane.log_view.search_text: 246 self.ninja_log_pane.log_view.set_search_regex( 247 '^FAILED: ', False, None) 248 if backwards: 249 self.ninja_log_pane.log_view.search_backwards() 250 else: 251 self.ninja_log_pane.log_view.search_forwards() 252 self.ninja_log_pane.log_view.log_screen.reset_logs( 253 log_index=self.ninja_log_pane.log_view.log_index) 254 255 self.ninja_log_pane.log_view.move_selected_line_to_top() 256 257 def update_menu_items(self): 258 """Required by the Window Manager Class.""" 259 260 def redraw_ui(self): 261 """Redraw the prompt_toolkit UI.""" 262 if hasattr(self, 'application'): 263 self.application.invalidate() 264 265 def focus_on_container(self, pane): 266 """Set application focus to a specific container.""" 267 self.application.layout.focus(pane) 268 269 def focused_window(self): 270 """Return the currently focused window.""" 271 return self.application.layout.current_window 272 273 def command_runner_is_open(self) -> bool: 274 # pylint: disable=no-self-use 275 return False 276 277 def clear_ninja_log(self) -> None: 278 self.ninja_log_view.log_store.clear_logs() 279 self.ninja_log_view._restart_filtering() # pylint: disable=protected-access 280 self.ninja_log_view.view_mode_changed() 281 282 def run_build(self): 283 """Manually trigger a rebuild.""" 284 self.clear_ninja_log() 285 self.event_handler.rebuild() 286 287 def rebuild_on_filechange(self): 288 self.ninja_log_view.log_store.clear_logs() 289 self.ninja_log_view.view_mode_changed() 290 291 def get_statusbar_text(self): 292 status = self.event_handler.status_message 293 fragments = [('class:logo', 'Pigweed Watch')] 294 is_building = False 295 if status: 296 fragments = [status] 297 is_building = status[1].endswith('Building') 298 separator = ('', ' ') 299 self.status_bar_border_style = 'class:theme-fg-green' 300 301 if is_building: 302 percent = self.event_handler.current_build_percent 303 percent *= 100 304 fragments.append(separator) 305 fragments.append(('ansicyan', '{:.0f}%'.format(percent))) 306 self.status_bar_border_style = 'class:theme-fg-yellow' 307 308 if self.event_handler.current_build_errors > 0: 309 fragments.append(separator) 310 fragments.append(('', 'Errors:')) 311 fragments.append( 312 ('ansired', str(self.event_handler.current_build_errors))) 313 self.status_bar_border_style = 'class:theme-fg-red' 314 315 if is_building: 316 fragments.append(separator) 317 fragments.append(('', self.event_handler.current_build_step)) 318 319 return fragments 320 321 def get_resultbar_text(self) -> StyleAndTextTuples: 322 result = self.event_handler.result_message 323 if not result: 324 result = [('', 'Loading...')] 325 return result 326 327 def exit(self, exit_code: int = 0) -> None: 328 log_file = self.external_logfile 329 330 def _really_exit(future: asyncio.Future) -> NoReturn: 331 if log_file: 332 # Print a message showing where logs were saved to. 333 print('Logs saved to: {}'.format(log_file.resolve())) 334 sys.exit(future.result()) 335 336 if self.application.future: 337 self.application.future.add_done_callback(_really_exit) 338 self.application.exit(result=exit_code) 339 340 def check_build_status(self) -> bool: 341 if not self.event_handler.current_stdout: 342 return False 343 344 if self._errors_in_output: 345 return True 346 347 if self.event_handler.current_build_errors > self._build_error_count: 348 self._errors_in_output = True 349 self.jump_to_error() 350 351 return True 352 353 def run(self): 354 self.plugin_start() 355 # Run the prompt_toolkit application 356 self.application.run(set_exception_handler=True) 357 358 def input_box_not_focused(self) -> Condition: 359 """Condition checking the focused control is not a text input field.""" 360 @Condition 361 def _test() -> bool: 362 """Check if the currently focused control is an input buffer. 363 364 Returns: 365 bool: True if the currently focused control is not a text input 366 box. For example if the user presses enter when typing in 367 the search box, return False. 368 """ 369 return not isinstance(self.application.layout.current_control, 370 BufferControl) 371 372 return _test 373