• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4#
5# Copyright (c) 2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from __future__ import print_function
20from __future__ import unicode_literals
21
22import sys
23import importlib
24
25from prompt_toolkit.shortcuts import run_application
26from prompt_toolkit.application import Application
27from prompt_toolkit.key_binding.manager import KeyBindingManager
28from prompt_toolkit.keys import Keys
29from prompt_toolkit.layout.containers import Window
30from prompt_toolkit.filters import IsDone
31from prompt_toolkit.layout.controls import TokenListControl
32from prompt_toolkit.layout.containers import ConditionalContainer
33from prompt_toolkit.layout.containers import HSplit
34from prompt_toolkit.layout.dimension import LayoutDimension as D
35from prompt_toolkit.token import Token
36from exceptions.ohos_exception import OHOSException
37from services.interface.menu_interface import MenuInterface
38from helper.separator import Separator
39from containers.arg import Arg, ModuleType
40from resources.config import Config
41from util.log_util import LogUtil
42from util.product_util import ProductUtil
43
44
45class Menu(MenuInterface):
46
47    def select_compile_option(self) -> dict:
48        config = Config()
49        results = {}
50        all_build_args = Arg.parse_all_args(ModuleType.BUILD)
51        for arg in all_build_args.values():
52            if isinstance(arg, Arg) and arg.arg_attribute.get("optional"):
53                if arg.arg_name != 'target_cpu':
54                    choices = [
55                        choice if isinstance(choice, Separator) else {
56                            'name': choice,
57                            'value': arg.arg_name
58                        } for choice in arg.arg_attribute.get("optional")
59                    ]
60                else:
61                    if config.support_cpu is not None and isinstance(config.support_cpu, list):
62                        choices = []
63                        for cpu in config.support_cpu:
64                            choices.append({
65                                'name': cpu,
66                                'value': arg.arg_name,
67                            })
68                    elif config.target_cpu is not None:
69                        choices = [
70                            {
71                                'name': config.target_cpu,
72                                'value': arg.arg_name,
73                            }
74                        ]
75                    else:
76                        choices = [
77                            {
78                                'name': all_build_args.get('target_cpu').arg_value,
79                                'value': arg.arg_name,
80                            }
81                        ]
82
83                result = self._list_promt(arg.arg_name, 'select {} value'.format(
84                    arg.arg_name), choices).get(arg.arg_name)[0]
85                results[arg.arg_name] = result
86        return results
87
88    def _select_os_level(self) -> str:
89        choices = [
90            {
91                'name': 'mini',
92                'value': 'os_level'
93            },
94            {
95                'name': 'small',
96                'value': 'os_level'
97            },
98            {
99                'name': 'standard',
100                'value': 'os_level'
101            }
102        ]
103        return self._list_promt('os_level', 'Which os_level do you need?',
104                                choices).get('os_level')[0]
105
106    def select_product(self) -> dict:
107        product_path_dict = {}
108        company_separator = None
109        os_level = self._select_os_level()
110        for product_info in ProductUtil.get_products():
111            if product_info['os_level'] is None:
112                raise OHOSException("")
113            if product_info['os_level'] == os_level:
114                company = product_info['company']
115                product = product_info['name']
116                if company_separator is None or company_separator != company:
117                    company_separator = company
118                    product_key = Separator(company_separator)
119                    product_path_dict[product_key] = None
120
121                product_path_dict['{}@{}'.format(product,
122                                                 company)] = product_info
123
124        if not len(product_path_dict):
125            raise OHOSException('no valid product found')
126
127        choices = [
128            product if isinstance(product, Separator) else {
129                'name': product.split('@')[0],
130                'value': product.split('@')[1]
131            } for product in product_path_dict.keys()
132        ]
133        product = self._list_promt('product', 'Which product do you need?',
134                                   choices).get('product')
135        product_key = f'{product[0]}@{product[1]}'
136        return product_path_dict.get(product_key)
137
138    def _list_promt(self, name, message, choices, **kwargs):
139        questions = self._get_questions('list', name, message, choices)
140
141        return self._prompt(questions=questions, **kwargs)
142
143    def _get_questions(self, promt_type, name, message, choices):
144        questions = [{
145            'type': promt_type,
146            'qmark': 'OHOS',
147            'name': name,
148            'message': message,
149            'choices': choices
150        }]
151
152        return questions
153
154    def _prompt(self, questions, answers=None, **kwargs):
155        if isinstance(questions, dict):
156            questions = [questions]
157        answers = answers or {}
158
159        patch_stdout = kwargs.pop('patch_stdout', False)
160        return_asyncio_coroutine = kwargs.pop(
161            'return_asyncio_coroutine', False)
162        true_color = kwargs.pop('true_color', False)
163        refresh_interval = kwargs.pop('refresh_interval', 0)
164        eventloop = kwargs.pop('eventloop', None)
165        kbi_msg = kwargs.pop('keyboard_interrupt_msg', 'Cancelled by user')
166
167        for question in questions:
168            try:
169                choices = question.get('choices')
170                if choices is not None and callable(choices):
171                    question['choices'] = choices(answers)
172
173                _kwargs = {}
174                _kwargs.update(kwargs)
175                _kwargs.update(question)
176                question_type = _kwargs.pop('type')
177                name = _kwargs.pop('name')
178                message = _kwargs.pop('message')
179                when = _kwargs.pop('when', None)
180                question_filter = _kwargs.pop('filter', None)
181
182                if when:
183                    # at least a little sanity check!
184                    if callable(question['when']):
185                        try:
186                            if not question['when'](answers):
187                                continue
188                        except Exception as error:
189                            raise ValueError(
190                                'Problem in \'when\' check of %s question: %s' %
191                                (name, error))
192                    else:
193                        raise ValueError('\'when\' needs to be function that '
194                                         'accepts a dict argument')
195                if question_filter:
196                    # at least a little sanity check!
197                    if not callable(question['filter']):
198                        raise ValueError('\'filter\' needs to be function that '
199                                         'accepts an argument')
200
201                if callable(question.get('default')):
202                    _kwargs['default'] = question['default'](answers)
203
204                application = self._question(message, **_kwargs)
205
206                answer = run_application(
207                    application,
208                    patch_stdout=patch_stdout,
209                    return_asyncio_coroutine=return_asyncio_coroutine,
210                    true_color=true_color,
211                    refresh_interval=refresh_interval,
212                    eventloop=eventloop)
213
214                if answer is not None:
215                    if question_filter:
216                        try:
217                            answer = question['filter'](answer)
218                        except Exception as error:
219                            raise ValueError('Problem processing \'filter\' of'
220                                             '{} question: {}'.format(name, error))
221                    answers[name] = answer
222            except AttributeError as attr_error:
223                LogUtil.hb_error(attr_error)
224                raise ValueError('No question type \'%s\'' % question_type)
225            except KeyboardInterrupt:
226                LogUtil.hb_warning('')
227                LogUtil.hb_warning(kbi_msg)
228                LogUtil.hb_warning('')
229                return {}
230        return answers
231
232    def _question(self, message, **kwargs):
233        if 'choices' not in kwargs:
234            raise OHOSException("You must choose one platform.")
235
236        choices = kwargs.pop('choices', None)
237        qmark = kwargs.pop('qmark', '?')
238        style = kwargs.pop('style', _get_style('terminal'))
239
240        inquirer_control = InquirerControl(choices)
241
242        def get_prompt_tokens(cli):
243            tokens = []
244
245            tokens.append((Token.QuestionMark, qmark))
246            tokens.append((Token.Question, ' %s ' % message))
247            if inquirer_control.answered:
248                tokens.append((Token.Answer, ' ' +
249                               inquirer_control.get_selection()[0]))
250            else:
251                tokens.append((Token.Instruction, ' (Use arrow keys)'))
252            return tokens
253
254        # assemble layout
255        layout = HSplit([
256            Window(height=D.exact(1),
257                   content=TokenListControl(get_prompt_tokens)),
258            ConditionalContainer(
259                Window(inquirer_control),
260                filter=~IsDone()
261            )
262        ])
263
264        # key bindings
265        manager = KeyBindingManager.for_prompt()
266
267        @manager.registry.add_binding(Keys.ControlQ, eager=True)
268        @manager.registry.add_binding(Keys.ControlC, eager=True)
269        def _(event):
270            raise KeyboardInterrupt()
271
272        @manager.registry.add_binding(Keys.Down, eager=True)
273        def move_cursor_down(event):
274            def _next():
275                inquirer_control.selected_option_index = (
276                    (inquirer_control.selected_option_index + 1) %
277                    inquirer_control.choice_count)
278            _next()
279            while isinstance(inquirer_control.choices[
280                inquirer_control.selected_option_index][0], Separator) \
281                    or inquirer_control.choices[
282                        inquirer_control.selected_option_index][2]:
283                _next()
284
285        @manager.registry.add_binding(Keys.Up, eager=True)
286        def move_cursor_up(event):
287            def _prev():
288                inquirer_control.selected_option_index = (
289                    (inquirer_control.selected_option_index - 1) %
290                    inquirer_control.choice_count)
291            _prev()
292            while isinstance(inquirer_control.choices[
293                inquirer_control.selected_option_index][0], Separator) \
294                    or inquirer_control.choices[
295                        inquirer_control.selected_option_index][2]:
296                _prev()
297
298        @manager.registry.add_binding(Keys.Enter, eager=True)
299        def set_answer(event):
300            inquirer_control.answered = True
301            event.cli.set_return_value(inquirer_control.get_selection())
302
303        return Application(
304            layout=layout,
305            key_bindings_registry=manager.registry,
306            mouse_support=False,
307            style=style
308        )
309
310
311class InquirerControl(TokenListControl):
312    def __init__(self, choices, **kwargs):
313        self.selected_option_index = 0
314        self.answered = False
315        self.choices = choices
316        self._init_choices(choices)
317        super(InquirerControl, self).__init__(self._get_choice_tokens,
318                                              **kwargs)
319
320    def _init_choices(self, choices, default=None):
321        # helper to convert from question format to internal format
322        self.choices = []  # list (name, value, disabled)
323        searching_first_choice = True
324        for index, choice in enumerate(choices):
325            if isinstance(choice, Separator):
326                self.choices.append((choice, None, None))
327            else:
328                base_string = str if sys.version_info[0] >= 3 else None
329                if isinstance(choice, base_string):
330                    self.choices.append((choice, choice, None))
331                else:
332                    name = choice.get('name')
333                    value = choice.get('value', name)
334                    disabled = choice.get('disabled', None)
335                    self.choices.append((name, value, disabled))
336                if searching_first_choice:
337                    self.selected_option_index = index
338                    searching_first_choice = False
339
340    @property
341    def choice_count(self):
342        return len(self.choices)
343
344    def _get_choice_tokens(self, cli):
345        tokens = []
346        token = Token
347
348        def append(index, choice):
349            selected = (index == self.selected_option_index)
350
351            @_if_mousedown
352            def select_item(cli, mouse_event):
353                # bind option with this index to mouse event
354                self.selected_option_index = index
355                self.answered = True
356
357            tokens.append((token.Pointer if selected else token, ' \u276f '
358                          if selected else '   '))
359            if selected:
360                tokens.append((Token.SetCursorPosition, ''))
361            if choice[2]:  # disabled
362                tokens.append((token.Selected if selected else token,
363                               '- %s (%s)' % (choice[0], choice[2])))
364            else:
365                if isinstance(choice[0], Separator):
366                    tokens.append((token.Separator,
367                                  str(choice[0]),
368                                  select_item))
369                else:
370                    try:
371                        tokens.append((token.Selected if selected else token,
372                                      str(choice[0]), select_item))
373                    except Exception:
374                        tokens.append((token.Selected if selected else
375                                      token, choice[0], select_item))
376            tokens.append((token, '\n'))
377
378        # prepare the select choices
379        for i, choice in enumerate(self.choices):
380            append(i, choice)
381        tokens.pop()  # Remove last newline.
382        return tokens
383
384    def get_selection(self):
385        return self.choices[self.selected_option_index]
386
387
388def _get_style(style_type):
389    style = importlib.import_module('prompt_toolkit.styles')
390    token = importlib.import_module('prompt_toolkit.token')
391    if style_type == 'terminal':
392        return style.style_from_dict({
393            token.Token.Separator: '#75c951',
394            token.Token.QuestionMark: '#5F819D',
395            token.Token.Selected: '',  # default
396            token.Token.Pointer: '#FF9D00 bold',  # AWS orange
397            token.Token.Instruction: '',  # default
398            token.Token.Answer: '#FF9D00 bold',  # AWS orange
399            token.Token.Question: 'bold',
400        })
401    if style_type == 'answer':
402        return style.style_from_dict({
403            token.Token.Separator: '#75c951',
404            token.Token.QuestionMark: '#E91E63 bold',
405            token.Token.Selected: '#cc5454',  # default
406            token.Token.Pointer: '#ed9164 bold',
407            token.Token.Instruction: '',  # default
408            token.Token.Answer: '#f44336 bold',
409            token.Token.Question: '',
410        })
411
412    return None
413
414
415def _if_mousedown(handler):
416    def handle_if_mouse_down(cli, mouse_event):
417        mouse_events = importlib.import_module('prompt_toolkit.mouse_events')
418        if mouse_event.event_type == mouse_events.MouseEventTypes.MOUSE_DOWN:
419            return handler(cli, mouse_event)
420        else:
421            return NotImplemented
422
423    return handle_if_mouse_down
424