• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2021-2024 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import logging
19from typing import Dict, Optional, List, Union, Any
20
21import yaml
22from runner import utils
23from runner.logger import Log
24
25_LOGGER = logging.getLogger("runner.options.yaml_document")
26
27
28class YamlDocument:
29
30    def __init__(self) -> None:
31        super().__init__()
32        self._document: Optional[Dict[str, Any]] = None
33        self._warnings: List[str] = []
34
35    @staticmethod
36    def load(config_path: str) -> Dict[str, Any]:
37        with open(config_path, "r", encoding="utf-8") as stream:
38            try:
39                data: Dict[str, Any] = yaml.safe_load(stream)
40                return data
41            except yaml.YAMLError as exc:
42                Log.exception_and_raise(_LOGGER, str(exc), yaml.YAMLError)
43
44    @staticmethod
45    def save(config_path: str, data: Dict[str, Any]) -> None:
46        data_to_save = yaml.dump(data, indent=4)
47        utils.write_2_file(config_path, data_to_save)
48
49    def load_configs(self, config_paths: Optional[List[str]]) -> None:
50        if config_paths is None:
51            return
52        for config_path in config_paths:
53            self.merge(config_path, self.load(config_path))
54
55    def document(self) -> Optional[Dict[str, Any]]:
56        return self._document
57
58    def warnings(self) -> List[str]:
59        return self._warnings
60
61    # We use Log.exception_and_raise which throws exception. no need in return
62    # pylint: disable=inconsistent-return-statements
63    def get_value_by_path(self, yaml_path: str) -> Optional[Union[int, bool, str, List[str]]]:
64        yaml_parts = yaml_path.split(".")
65        current: Any = self._document
66        for part in yaml_parts:
67            if current and isinstance(current, dict) and part in current.keys():
68                current = current.get(part)
69            else:
70                return None
71        if current is None or isinstance(current, (bool, int, list, str)):
72            return current
73
74        Log.exception_and_raise(_LOGGER, f"Unsupported value type '{type(current)}' for '{yaml_path}'")
75
76    def merge(self, config_path: str, data: Dict[str, Any]) -> None:
77        if self._document is None:
78            self._document = data
79            return
80        self.__merge_level(config_path, "", data, self._document)
81
82    def __merge_level(self, config_path: str, parent_key: str, current_data: Dict[str, Any],
83                      parent_data: Dict[str, Any]) -> \
84            None:
85        for key in current_data:
86            if key not in parent_data:
87                parent_data[key] = current_data[key]
88                continue
89            current_value = current_data[key]
90            parent_value = parent_data[key]
91            new_parent_key = f"{parent_key}.{key}" if parent_key else key
92            # CC-OFFNXT(G.CTL.03) temporary suppress
93            if current_value and isinstance(current_value, dict) and parent_value and isinstance(parent_value, dict):
94                self.__merge_level(config_path, new_parent_key, current_value, parent_value)
95                continue
96            # CC-OFFNXT(G.CTL.03) temporary suppress
97            if current_value and isinstance(current_value, list) and parent_value and isinstance(parent_value, list):
98                self._warnings.append(f"Attention: config file '{config_path}' merges value "
99                                      f"`{new_parent_key}:{current_value}` with `{parent_value}` ")
100                parent_value.extend(current_value)
101                parent_data[key] = list(set(parent_value))
102                continue
103            current_type = type(current_value)
104            parent_type = type(parent_value)
105            if current_type != parent_type:
106                self._warnings.append(
107                    f"Attention: config file '{config_path}' for key '{new_parent_key}' provides "
108                    f"different type {current_type}. Before: {parent_type}")
109                continue
110            if current_value == parent_value:
111                continue
112            self._warnings.append(
113                f"Attention: config file '{config_path}' replaces value '{new_parent_key}:{parent_value}' "
114                f"with '{current_value}'")
115            parent_data[key] = current_value
116