1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# 5# Copyright (c) 2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19from __future__ import print_function 20from __future__ import unicode_literals 21 22import sys 23import importlib 24 25from prompt_toolkit.shortcuts import run_application 26from prompt_toolkit.application import Application 27from prompt_toolkit.key_binding.manager import KeyBindingManager 28from prompt_toolkit.keys import Keys 29from prompt_toolkit.layout.containers import Window 30from prompt_toolkit.filters import IsDone 31from prompt_toolkit.layout.controls import TokenListControl 32from prompt_toolkit.layout.containers import ConditionalContainer 33from prompt_toolkit.layout.containers import HSplit 34from prompt_toolkit.layout.dimension import LayoutDimension as D 35from prompt_toolkit.token import Token 36from exceptions.ohos_exception import OHOSException 37from services.interface.menu_interface import MenuInterface 38from helper.separator import Separator 39from containers.arg import Arg, ModuleType 40from resources.config import Config 41from util.log_util import LogUtil 42from util.product_util import ProductUtil 43 44 45class Menu(MenuInterface): 46 47 def select_compile_option(self) -> dict: 48 config = Config() 49 results = {} 50 all_build_args = Arg.parse_all_args(ModuleType.BUILD) 51 for arg in all_build_args.values(): 52 if isinstance(arg, Arg) and arg.arg_attribute.get("optional"): 53 if arg.arg_name != 'target_cpu': 54 choices = [ 55 choice if isinstance(choice, Separator) else { 56 'name': choice, 57 'value': arg.arg_name 58 } for choice in arg.arg_attribute.get("optional") 59 ] 60 else: 61 if config.support_cpu is not None and isinstance(config.support_cpu, list): 62 choices = [] 63 for cpu in config.support_cpu: 64 choices.append({ 65 'name': cpu, 66 'value': arg.arg_name, 67 }) 68 elif config.target_cpu is not None: 69 choices = [ 70 { 71 'name': config.target_cpu, 72 'value': arg.arg_name, 73 } 74 ] 75 else: 76 choices = [ 77 { 78 'name': all_build_args.get('target_cpu').arg_value, 79 'value': arg.arg_name, 80 } 81 ] 82 83 result = self._list_promt(arg.arg_name, 'select {} value'.format( 84 arg.arg_name), choices).get(arg.arg_name)[0] 85 results[arg.arg_name] = result 86 return results 87 88 def _select_os_level(self) -> str: 89 choices = [ 90 { 91 'name': 'mini', 92 'value': 'os_level' 93 }, 94 { 95 'name': 'small', 96 'value': 'os_level' 97 }, 98 { 99 'name': 'standard', 100 'value': 'os_level' 101 } 102 ] 103 return self._list_promt('os_level', 'Which os_level do you need?', 104 choices).get('os_level')[0] 105 106 def select_product(self) -> dict: 107 product_path_dict = {} 108 company_separator = None 109 os_level = self._select_os_level() 110 for product_info in ProductUtil.get_products(): 111 if product_info['os_level'] is None: 112 raise OHOSException("") 113 if product_info['os_level'] == os_level: 114 company = product_info['company'] 115 product = product_info['name'] 116 if company_separator is None or company_separator != company: 117 company_separator = company 118 product_key = Separator(company_separator) 119 product_path_dict[product_key] = None 120 121 product_path_dict['{}@{}'.format(product, 122 company)] = product_info 123 124 if not len(product_path_dict): 125 raise OHOSException('no valid product found') 126 127 choices = [ 128 product if isinstance(product, Separator) else { 129 'name': product.split('@')[0], 130 'value': product.split('@')[1] 131 } for product in product_path_dict.keys() 132 ] 133 product = self._list_promt('product', 'Which product do you need?', 134 choices).get('product') 135 product_key = f'{product[0]}@{product[1]}' 136 return product_path_dict.get(product_key) 137 138 def _list_promt(self, name: str, message: str, choices: list, **kwargs): 139 questions = self._get_questions('list', name, message, choices) 140 141 return self._prompt(questions=questions, **kwargs) 142 143 def _get_questions(self, promt_type: str, name: str, message: str, choices: list): 144 questions = [{ 145 'type': promt_type, 146 'qmark': 'OHOS', 147 'name': name, 148 'message': message, 149 'choices': choices 150 }] 151 152 return questions 153 154 def _prompt(self, questions: list, answers=None, **kwargs): 155 if isinstance(questions, dict): 156 questions = [questions] 157 answers = answers or {} 158 159 patch_stdout = kwargs.pop('patch_stdout', False) 160 return_asyncio_coroutine = kwargs.pop( 161 'return_asyncio_coroutine', False) 162 true_color = kwargs.pop('true_color', False) 163 refresh_interval = kwargs.pop('refresh_interval', 0) 164 eventloop = kwargs.pop('eventloop', None) 165 kbi_msg = kwargs.pop('keyboard_interrupt_msg', 'Cancelled by user') 166 167 for question in questions: 168 try: 169 choices = question.get('choices') 170 if choices is not None and callable(choices): 171 question['choices'] = choices(answers) 172 173 _kwargs = {} 174 _kwargs.update(kwargs) 175 _kwargs.update(question) 176 question_type = _kwargs.pop('type') 177 name = _kwargs.pop('name') 178 message = _kwargs.pop('message') 179 when = _kwargs.pop('when', None) 180 question_filter = _kwargs.pop('filter', None) 181 182 if when: 183 # at least a little sanity check! 184 if callable(question['when']): 185 try: 186 if not question['when'](answers): 187 continue 188 except Exception as error: 189 raise ValueError( 190 'Problem in \'when\' check of %s question: %s' % 191 (name, error)) 192 else: 193 raise ValueError('\'when\' needs to be function that ' 194 'accepts a dict argument') 195 if question_filter: 196 # at least a little sanity check! 197 if not callable(question['filter']): 198 raise ValueError('\'filter\' needs to be function that ' 199 'accepts an argument') 200 201 if callable(question.get('default')): 202 _kwargs['default'] = question['default'](answers) 203 204 application = self._question(message, **_kwargs) 205 206 answer = run_application( 207 application, 208 patch_stdout=patch_stdout, 209 return_asyncio_coroutine=return_asyncio_coroutine, 210 true_color=true_color, 211 refresh_interval=refresh_interval, 212 eventloop=eventloop) 213 214 if answer is not None: 215 if question_filter: 216 try: 217 answer = question['filter'](answer) 218 except Exception as error: 219 raise ValueError('Problem processing \'filter\' of' 220 '{} question: {}'.format(name, error)) 221 answers[name] = answer 222 except AttributeError as attr_error: 223 LogUtil.hb_error(attr_error) 224 raise ValueError('No question type \'%s\'' % question_type) 225 except KeyboardInterrupt: 226 LogUtil.hb_warning('') 227 LogUtil.hb_warning(kbi_msg) 228 LogUtil.hb_warning('') 229 return {} 230 return answers 231 232 def _question(self, message: str, **kwargs): 233 if 'choices' not in kwargs: 234 raise OHOSException("You must choose one platform.") 235 236 choices = kwargs.pop('choices', None) 237 qmark = kwargs.pop('qmark', '?') 238 style = kwargs.pop('style', _get_style('terminal')) 239 240 inquirer_control = InquirerControl(choices) 241 242 def get_prompt_tokens(cli): 243 tokens = [] 244 245 tokens.append((Token.QuestionMark, qmark)) 246 tokens.append((Token.Question, ' %s ' % message)) 247 if inquirer_control.answered: 248 tokens.append((Token.Answer, ' ' + 249 inquirer_control.get_selection()[0])) 250 else: 251 tokens.append((Token.Instruction, ' (Use arrow keys)')) 252 return tokens 253 254 # assemble layout 255 layout = HSplit([ 256 Window(height=D.exact(1), 257 content=TokenListControl(get_prompt_tokens)), 258 ConditionalContainer( 259 Window(inquirer_control), 260 filter=~IsDone() 261 ) 262 ]) 263 264 # key bindings 265 manager = KeyBindingManager.for_prompt() 266 267 @manager.registry.add_binding(Keys.ControlQ, eager=True) 268 @manager.registry.add_binding(Keys.ControlC, eager=True) 269 def _(event): 270 raise KeyboardInterrupt() 271 272 @manager.registry.add_binding(Keys.Down, eager=True) 273 def move_cursor_down(event): 274 def _next(): 275 inquirer_control.selected_option_index = ( 276 (inquirer_control.selected_option_index + 1) % 277 inquirer_control.choice_count) 278 _next() 279 while isinstance(inquirer_control.choices[ 280 inquirer_control.selected_option_index][0], Separator) \ 281 or inquirer_control.choices[ 282 inquirer_control.selected_option_index][2]: 283 _next() 284 285 @manager.registry.add_binding(Keys.Up, eager=True) 286 def move_cursor_up(event): 287 def _prev(): 288 inquirer_control.selected_option_index = ( 289 (inquirer_control.selected_option_index - 1) % 290 inquirer_control.choice_count) 291 _prev() 292 while isinstance(inquirer_control.choices[ 293 inquirer_control.selected_option_index][0], Separator) \ 294 or inquirer_control.choices[ 295 inquirer_control.selected_option_index][2]: 296 _prev() 297 298 @manager.registry.add_binding(Keys.Enter, eager=True) 299 def set_answer(event): 300 inquirer_control.answered = True 301 event.cli.set_return_value(inquirer_control.get_selection()) 302 303 return Application( 304 layout=layout, 305 key_bindings_registry=manager.registry, 306 mouse_support=False, 307 style=style 308 ) 309 310 311class InquirerControl(TokenListControl): 312 def __init__(self, choices: list, **kwargs): 313 self.selected_option_index = 0 314 self.answered = False 315 self.choices = choices 316 self._init_choices(choices) 317 super(InquirerControl, self).__init__(self._get_choice_tokens, 318 **kwargs) 319 320 def _init_choices(self, choices: list, default=None): 321 # helper to convert from question format to internal format 322 self.choices = [] # list (name, value, disabled) 323 searching_first_choice = True 324 for index, choice in enumerate(choices): 325 if isinstance(choice, Separator): 326 self.choices.append((choice, None, None)) 327 else: 328 base_string = str if sys.version_info[0] >= 3 else None 329 if isinstance(choice, base_string): 330 self.choices.append((choice, choice, None)) 331 else: 332 name = choice.get('name') 333 value = choice.get('value', name) 334 disabled = choice.get('disabled', None) 335 self.choices.append((name, value, disabled)) 336 if searching_first_choice: 337 self.selected_option_index = index 338 searching_first_choice = False 339 340 @property 341 def choice_count(self): 342 return len(self.choices) 343 344 def _get_choice_tokens(self, cli): 345 tokens = [] 346 token = Token 347 348 def append(index: int, choice: list): 349 selected = (index == self.selected_option_index) 350 351 @_if_mousedown 352 def select_item(cli, mouse_event): 353 # bind option with this index to mouse event 354 self.selected_option_index = index 355 self.answered = True 356 357 tokens.append((token.Pointer if selected else token, ' \u276f ' 358 if selected else ' ')) 359 if selected: 360 tokens.append((Token.SetCursorPosition, '')) 361 if choice[2]: # disabled 362 tokens.append((token.Selected if selected else token, 363 '- %s (%s)' % (choice[0], choice[2]))) 364 else: 365 if isinstance(choice[0], Separator): 366 tokens.append((token.Separator, 367 str(choice[0]), 368 select_item)) 369 else: 370 try: 371 tokens.append((token.Selected if selected else token, 372 str(choice[0]), select_item)) 373 except Exception: 374 tokens.append((token.Selected if selected else 375 token, choice[0], select_item)) 376 tokens.append((token, '\n')) 377 378 # prepare the select choices 379 for i, choice in enumerate(self.choices): 380 append(i, choice) 381 tokens.pop() # Remove last newline. 382 return tokens 383 384 def get_selection(self): 385 return self.choices[self.selected_option_index] 386 387 388def _get_style(style_type: str): 389 style = importlib.import_module('prompt_toolkit.styles') 390 token = importlib.import_module('prompt_toolkit.token') 391 if style_type == 'terminal': 392 return style.style_from_dict({ 393 token.Token.Separator: '#75c951', 394 token.Token.QuestionMark: '#5F819D', 395 token.Token.Selected: '', # default 396 token.Token.Pointer: '#FF9D00 bold', # AWS orange 397 token.Token.Instruction: '', # default 398 token.Token.Answer: '#FF9D00 bold', # AWS orange 399 token.Token.Question: 'bold', 400 }) 401 if style_type == 'answer': 402 return style.style_from_dict({ 403 token.Token.Separator: '#75c951', 404 token.Token.QuestionMark: '#E91E63 bold', 405 token.Token.Selected: '#cc5454', # default 406 token.Token.Pointer: '#ed9164 bold', 407 token.Token.Instruction: '', # default 408 token.Token.Answer: '#f44336 bold', 409 token.Token.Question: '', 410 }) 411 412 return None 413 414 415def _if_mousedown(handler): 416 def handle_if_mouse_down(cli, mouse_event): 417 mouse_events = importlib.import_module('prompt_toolkit.mouse_events') 418 if mouse_event.event_type == mouse_events.MouseEventTypes.MOUSE_DOWN: 419 return handler(cli, mouse_event) 420 else: 421 return NotImplemented 422 423 return handle_if_mouse_down 424