1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2021-2024 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import logging 19import re 20from dataclasses import dataclass 21from os import path 22from typing import Optional, Dict, Any, Union, List, Set, Sequence 23 24from runner.logger import Log 25from runner.options.yaml_document import YamlDocument 26 27 28@dataclass 29class Chapter: 30 name: str 31 includes: List[str] 32 excludes: List[str] 33 34 35class IncorrectFileFormatChapterException(Exception): 36 def __init__(self, chapters_name: str) -> None: 37 message = f"Incorrect file format: {chapters_name}" 38 Exception.__init__(self, message) 39 40 41class CyclicDependencyChapterException(Exception): 42 def __init__(self, item: str) -> None: 43 message = f"Cyclic dependency: {item}" 44 Exception.__init__(self, message) 45 46 47_LOGGER = logging.getLogger("runner.chapters") 48 49 50class Chapters: 51 def __init__(self, chapters_file: str): 52 self.chapters = self.__parse(chapters_file) 53 self.__validate_cycles() 54 55 @staticmethod 56 def __parse(chapters_file: str) -> Dict[str, Chapter]: 57 result: Dict[str, Chapter] = {} 58 yaml_header: Dict[str, Any] = YamlDocument.load(chapters_file) 59 if not yaml_header or not isinstance(yaml_header, dict): 60 Log.exception_and_raise(_LOGGER, chapters_file, IncorrectFileFormatChapterException) 61 yaml_chapters: Optional[List[Any]] = yaml_header.get('chapters') 62 if not yaml_chapters or not isinstance(yaml_chapters, list): 63 Log.exception_and_raise(_LOGGER, chapters_file, IncorrectFileFormatChapterException) 64 for yaml_chapter in yaml_chapters: 65 if not isinstance(yaml_chapter, dict): 66 Log.exception_and_raise(_LOGGER, chapters_file, IncorrectFileFormatChapterException) 67 for yaml_name, yaml_items in yaml_chapter.items(): 68 chapter = Chapters.__parse_chapter(yaml_name, yaml_items, chapters_file) 69 if chapter.name in result: 70 Log.exception_and_raise( 71 _LOGGER, 72 f"Chapter '{chapter.name}' already used", 73 IncorrectFileFormatChapterException) 74 result[chapter.name] = chapter 75 return result 76 77 @staticmethod 78 def __parse_item(includes: List[str], excludes: List[str], yaml_item: Union[str, dict]) -> None: 79 if isinstance(yaml_item, str): 80 includes.append(yaml_item.strip()) 81 elif isinstance(yaml_item, dict): 82 for sub_name, sub_items in yaml_item.items(): 83 if sub_name == 'exclude' and isinstance(sub_items, list): 84 excludes.extend(sub_items) 85 else: 86 Log.exception_and_raise( 87 _LOGGER, 88 f"Only 'exclude' is allowed as a nested dictionary: {sub_name}", 89 IncorrectFileFormatChapterException) 90 91 @staticmethod 92 def __parse_chapter(name: str, yaml_items: Sequence[Union[str, Dict[str, str]]], chapters_file: str) -> Chapter: 93 if not isinstance(yaml_items, list): 94 Log.exception_and_raise( 95 _LOGGER, 96 f"Incorrect file format: {chapters_file}", 97 IncorrectFileFormatChapterException 98 ) 99 includes: List[str] = [] 100 excludes: List[str] = [] 101 for yaml_item in yaml_items: 102 Chapters.__parse_item(includes, excludes, yaml_item) 103 104 return Chapter(name, includes, excludes) 105 106 @staticmethod 107 def __filter_by_mask(mask: str, files: List[str], extension: str) -> List[str]: 108 filtered: List[str] = [] 109 if '*' not in mask and not mask.endswith(f".{extension}"): 110 # mask is a folder 111 mask = f'{mask}/*' 112 mask = mask.replace('\\', r'\\') 113 mask = mask.replace('.', r'\.') 114 mask = mask.replace('*', '.*') 115 for file in files: 116 match = re.search(mask, file) 117 if match is not None: 118 filtered.append(file) 119 return filtered 120 121 def filter_by_chapter(self, chapter_name: str, base_folder: str, files: List[str], extension: str) -> Set[str]: 122 if chapter_name not in self.chapters: 123 return set() 124 chapter = self.chapters[chapter_name] 125 filtered: Set[str] = set() 126 for inc in chapter.includes: 127 if inc in self.chapters: 128 filtered.update(self.filter_by_chapter(inc, base_folder, files, extension)) 129 else: 130 mask = path.join(base_folder, inc) 131 filtered.update(Chapters.__filter_by_mask(mask, files, extension)) 132 excluded: Set[str] = set() 133 for exc in chapter.excludes: 134 if exc in self.chapters: 135 excluded.update(self.filter_by_chapter(exc, base_folder, files, extension)) 136 else: 137 mask = path.join(base_folder, exc) 138 excluded.update(Chapters.__filter_by_mask(mask, files, extension)) 139 return filtered - excluded 140 141 def __validate_cycles(self) -> None: 142 """ 143 :raise: CyclicDependencyChapterException if a cyclic dependency found 144 Normal finish means that no cycle found 145 """ 146 seen_chapters: List[str] = [] 147 for name, chapter in self.chapters.items(): 148 seen_chapters.append(name) 149 self.__check_cycle(chapter, seen_chapters) 150 seen_chapters.pop() 151 152 def __check_cycle(self, chapter: Chapter, seen_chapters: List[str]) -> None: 153 """ 154 Checks if items contains any name from seen 155 :param chapter: investigated chapter 156 :param seen_chapters: array of seen items' names 157 :raise: CyclicDependencyChapterException if a name of nested chapter is in seen names 158 Normal finish means that no cycle found 159 """ 160 for item in chapter.includes + chapter.excludes: 161 if item not in self.chapters: 162 continue 163 if item in seen_chapters: 164 Log.exception_and_raise(_LOGGER, item, CyclicDependencyChapterException) 165 seen_chapters.append(item) 166 self.__check_cycle(self.chapters[item], seen_chapters) 167 seen_chapters.pop() 168