1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3 4# 5# Copyright (c) 2020 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18from __future__ import absolute_import 19from __future__ import print_function 20 21from prompt_toolkit.shortcuts import run_application 22 23from hb_internal.cts.checkbox import question as checkbox_question 24from hb_internal.cts.list import question as list_question 25from hb_internal.common.utils import hb_error 26from hb_internal.common.utils import hb_warning 27 28 29def prompt(questions, answers=None, **kwargs): 30 if isinstance(questions, dict): 31 questions = [questions] 32 answers = answers or {} 33 34 patch_stdout = kwargs.pop('patch_stdout', False) 35 return_asyncio_coroutine = kwargs.pop('return_asyncio_coroutine', False) 36 true_color = kwargs.pop('true_color', False) 37 refresh_interval = kwargs.pop('refresh_interval', 0) 38 eventloop = kwargs.pop('eventloop', None) 39 kbi_msg = kwargs.pop('keyboard_interrupt_msg', 'Cancelled by user') 40 41 for question in questions: 42 try: 43 choices = question.get('choices') 44 if choices is not None and callable(choices): 45 question['choices'] = choices(answers) 46 47 _kwargs = {} 48 _kwargs.update(kwargs) 49 _kwargs.update(question) 50 question_type = _kwargs.pop('type') 51 name = _kwargs.pop('name') 52 message = _kwargs.pop('message') 53 when = _kwargs.pop('when', None) 54 question_filter = _kwargs.pop('filter', None) 55 56 if when: 57 # at least a little sanity check! 58 if callable(question['when']): 59 try: 60 if not question['when'](answers): 61 continue 62 except Exception as error: 63 raise ValueError( 64 'Problem in \'when\' check of %s question: %s' % 65 (name, error)) 66 else: 67 raise ValueError('\'when\' needs to be function that ' 68 'accepts a dict argument') 69 if question_filter: 70 # at least a little sanity check! 71 if not callable(question['filter']): 72 raise ValueError('\'filter\' needs to be function that ' 73 'accepts an argument') 74 75 if callable(question.get('default')): 76 _kwargs['default'] = question['default'](answers) 77 78 if question_type == "checkbox": 79 application = checkbox_question(message, **_kwargs) 80 elif question_type == "list": 81 application = list_question(message, **_kwargs) 82 83 answer = run_application( 84 application, 85 patch_stdout=patch_stdout, 86 return_asyncio_coroutine=return_asyncio_coroutine, 87 true_color=true_color, 88 refresh_interval=refresh_interval, 89 eventloop=eventloop) 90 91 if answer is not None: 92 if question_filter: 93 try: 94 answer = question['filter'](answer) 95 except Exception as error: 96 raise ValueError('Problem processing \'filter\' of' 97 '{} question: {}'.format(name, error)) 98 answers[name] = answer 99 except AttributeError as attr_error: 100 hb_error(attr_error) 101 raise ValueError('No question type \'%s\'' % question_type) 102 except KeyboardInterrupt: 103 hb_warning('') 104 hb_warning(kbi_msg) 105 hb_warning('') 106 return {} 107 return answers 108