1#!/usr/bin/env python3 2# -- coding: utf-8 -- 3# 4# Copyright (c) 2024-2025 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17import re 18from functools import cached_property 19from os import path 20from typing import Any, Optional, cast 21 22from runner.common_exceptions import FileNotFoundException, InvalidConfiguration 23from runner.logger import Log 24from runner.options.macros import MacroNotExpanded, Macros, ParameterNotFound 25from runner.options.options import IOptions 26from runner.options.options_test_suite import TestSuiteOptions 27from runner.options.step import Step, StepKind 28from runner.utils import get_config_workflow_folder, has_macro, load_config 29from runner.utils import indent as utils_indent 30 31_LOGGER = Log.get_logger(__file__) 32 33 34class WorkflowOptions(IOptions): 35 __WORKFLOW = "workflow" 36 __WORKFLOW_NAME = "workflow-name" 37 __PROPERTIES = "properties" 38 __PARAMETERS = "parameters" 39 __TEST_SUITE = "test-suite" 40 __STEPS = "steps" 41 __IMPORTS = "imports" 42 __TYPE = "type" 43 44 def __init__(self, cfg_content: dict[str, Any], parent_test_suite: TestSuiteOptions, # type: ignore[explicit-any] 45 *, parent_workflow: Optional['WorkflowOptions'] = None) -> None: 46 super().__init__(None) 47 self._parent = parent_test_suite if parent_workflow is None else parent_workflow 48 inner = parent_workflow is not None 49 self.__name = cfg_content[self.__WORKFLOW] if not inner else cfg_content[self.__WORKFLOW_NAME] 50 self.__data = cfg_content[f"{self.__name}.data"] if not inner else cfg_content 51 self.__test_suite = parent_test_suite 52 self.__steps: list[Step] = [] 53 if not inner: 54 self.__parameters = dict((arg_key[(len(self.__name) + len(self.__PARAMETERS) + 2):], arg_value) 55 for arg_key, arg_value in cfg_content.items() 56 if arg_key.startswith(f"{self.__name}.{self.__PARAMETERS}.")) 57 for param, value_in_workflow in self.__parameters.items(): 58 value_in_test_suite = self.__test_suite.get_parameter(param) 59 if value_in_test_suite is not None: 60 self.__parameters[param] = value_in_test_suite 61 value_in_workflow = value_in_test_suite 62 self.__parameters[param] = self.__expand_macro_for_param(value_in_workflow, param) 63 else: 64 self.__parameters = cfg_content[self.__PARAMETERS] 65 self.__load_steps(self.__data[self.__STEPS]) 66 67 def __str__(self) -> str: 68 indent = 2 69 result = [f"{self.__name}\n{utils_indent(indent)}{self.__PARAMETERS}:"] 70 for param_name in sorted(self.__parameters.keys()): 71 param_value = self.__parameters[param_name] 72 result.append(f"{utils_indent(indent + 1)}{param_name}: {param_value}") 73 result.append(f"{utils_indent(indent)}{self.__STEPS}:") 74 for step in self.__steps: 75 result.append(str(step)) 76 77 return "\n".join(result) 78 79 @staticmethod 80 def __check_type(step_type: StepKind, actual_count: int, expected_max_count: int | None) -> None: 81 if expected_max_count is not None and actual_count > expected_max_count: 82 raise InvalidConfiguration( 83 f"Property 'step-type: {step_type.value}' can be set at only one step, " 84 f"but it is set at {actual_count} steps.") 85 86 @cached_property 87 def name(self) -> str: 88 return str(self.__name) 89 90 @cached_property 91 def steps(self) -> list[Step]: 92 return self.__steps 93 94 @cached_property 95 def parameters(self) -> dict[str, Any]: # type: ignore[explicit-any] 96 return self.__parameters 97 98 def get_command_line(self) -> str: 99 options = ' '.join([ 100 ]) 101 options_str = re.sub(r'\s+', ' ', options, flags=re.IGNORECASE | re.DOTALL) 102 103 return options_str 104 105 def get_parameter(self, key: str, default: Any | None = None) -> Any | None: # type: ignore[explicit-any] 106 return self.__parameters.get(key, default) 107 108 def check_binary_artifacts(self) -> None: 109 for step in self.steps: 110 if step.executable_path is not None and not step.executable_path.is_file(): 111 raise FileNotFoundException( 112 f"Specified binary at {step.executable_path} was not found") 113 114 def check_types(self) -> None: 115 types: dict[StepKind, int] = {} 116 for step in self.steps: 117 types[step.step_kind] = types.get(step.step_kind, 0) + 1 118 self.__check_type(StepKind.COMPILER, types.get(StepKind.COMPILER, 0), 1) 119 self.__check_type(StepKind.VERIFIER, types.get(StepKind.VERIFIER, 0), 1) 120 self.__check_type(StepKind.AOT, types.get(StepKind.AOT, 0), 1) 121 self.__check_type(StepKind.RUNTIME, types.get(StepKind.RUNTIME, 0), None) 122 123 def pretty_str(self) -> str: 124 result: list[str] = [step.pretty_str() for step in self.steps if str(step.executable_path) and step.enabled] 125 return '\n'.join(result) 126 127 def __expand_macro_for_param(self, value_in_workflow: str | list, param: str) -> str | list: 128 if (isinstance(value_in_workflow, str) and has_macro(value_in_workflow) and 129 not self.__test_suite.is_defined_in_collections(param)): 130 return self.__expand_macro_for_str(value_in_workflow) 131 if isinstance(value_in_workflow, list): 132 return self.__expand_macro_for_list(value_in_workflow) 133 return value_in_workflow 134 135 def __prepare_imported_configs(self, imported_configs: dict[str, dict[str, str]]) -> None: 136 for config_name, config_content in imported_configs.items(): 137 config_name = str(path.join(get_config_workflow_folder(), f"{config_name}.yaml")) 138 args = {} 139 for param, param_value in config_content.items(): 140 args.update(self.__prepare_imported_config(param, param_value)) 141 self.__load_imported_config(config_name, args) 142 143 def __prepare_imported_config(self, param: str, param_value: Any) -> dict[str, Any]: # type: ignore[explicit-any] 144 args = {} 145 if isinstance(param_value, str) and param_value.find(self.__PARAMETERS) >= 0: 146 param_value = param_value.replace(f"${{{self.__PARAMETERS}.", "").replace("}", "") 147 args[param] = self.__parameters[param_value] 148 elif isinstance(param_value, list): 149 args[param] = self.__prepare_list(param_value) 150 return args 151 152 def __prepare_list(self, param_value: list[str]) -> list[str]: 153 result_list = [] 154 for item in param_value: 155 corrected_item = Macros.correct_macro(item, self) 156 for sub_item in (corrected_item if isinstance(corrected_item, list) else [corrected_item]): 157 if sub_item and sub_item in self.__parameters and self.__parameters[sub_item]: 158 result_list.append(self.__parameters[sub_item]) 159 elif corrected_item: 160 result_list.append(sub_item) 161 return result_list 162 163 def __load_imported_config(self, cfg_path: str, # type: ignore[explicit-any] 164 actual_params: dict[str, Any]) -> None: 165 cfg_content = load_config(str(cfg_path)) 166 params = cast(dict, cfg_content.get(self.__PARAMETERS, {})) 167 for param, _ in params.items(): 168 if param in actual_params: 169 params[param] = actual_params[param] 170 workflow_options = WorkflowOptions(cfg_content, self.__test_suite, parent_workflow=self) 171 for step in workflow_options.steps: 172 names = [st.name for st in self.__steps] 173 if step.name not in [names]: 174 self.__steps.append(step) 175 for param_name, param_value in workflow_options.parameters.items(): 176 if param_name not in self.__parameters: 177 self.__parameters[param_name] = param_value 178 179 def __load_steps(self, steps: dict[str, dict]) -> None: 180 for step_name, step_content in steps.items(): 181 if step_name == self.__IMPORTS: 182 self.__prepare_imported_configs(step_content) 183 else: 184 self.__load_step(step_name, step_content) 185 186 def __load_step(self, step_name: str, step_content: dict[str, str | list]) -> None: 187 _LOGGER.all(f"Going to load step '{step_name}'") 188 for (step_item, step_value) in step_content.items(): 189 if isinstance(step_value, str): 190 step_content[step_item] = Macros.correct_macro(step_value, self) 191 new_args = [] 192 for arg in step_content['args']: 193 arg = Macros.correct_macro(arg, self) \ 194 if not self.__test_suite.is_defined_in_collections(arg) else arg 195 if isinstance(arg, list): 196 new_args.extend(arg) 197 else: 198 new_args.append(arg) 199 step_content['args'] = new_args 200 step = Step(step_name, step_content) 201 self.__steps.append(step) 202 203 def __expand_macro_for_str(self, value_in_workflow: str) -> str | list[str]: 204 try: 205 return Macros.correct_macro(value_in_workflow, self) 206 except ParameterNotFound as pnf: 207 _LOGGER.all(str(pnf)) 208 except MacroNotExpanded as pnf: 209 _LOGGER.all(str(pnf)) 210 return value_in_workflow 211 212 def __expand_macro_for_list(self, value_in_workflow: list) -> str | list[str]: 213 expanded_in_workflow: list[str] = [] 214 for value in value_in_workflow: 215 try: 216 expanded_value = Macros.correct_macro(value, self) 217 except (ParameterNotFound, MacroNotExpanded) as pnf: 218 _LOGGER.all(str(pnf)) 219 return value_in_workflow 220 if isinstance(expanded_value, list): 221 expanded_in_workflow.extend(expanded_value) 222 else: 223 expanded_in_workflow.append(expanded_value) 224 return expanded_in_workflow 225