1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3 4# 5# Copyright (c) 2020 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18import os 19import importlib 20try: 21 from queue import Queue 22except ImportError: 23 from Queue import Queue 24from collections import defaultdict 25 26from hb_internal.common.utils import read_json_file 27 28 29def get_style(style_type): 30 style = importlib.import_module('prompt_toolkit.styles') 31 token = importlib.import_module('prompt_toolkit.token') 32 if style_type == 'terminal': 33 return style.style_from_dict({ 34 token.Token.Separator: '#75c951', 35 token.Token.QuestionMark: '#5F819D', 36 token.Token.Selected: '', # default 37 token.Token.Pointer: '#FF9D00 bold', # AWS orange 38 token.Token.Instruction: '', # default 39 token.Token.Answer: '#FF9D00 bold', # AWS orange 40 token.Token.Question: 'bold', 41 }) 42 if style_type == 'answer': 43 return style.style_from_dict({ 44 token.Token.Separator: '#75c951', 45 token.Token.QuestionMark: '#E91E63 bold', 46 token.Token.Selected: '#cc5454', # default 47 token.Token.Pointer: '#ed9164 bold', 48 token.Token.Instruction: '', # default 49 token.Token.Answer: '#f44336 bold', 50 token.Token.Question: '', 51 }) 52 53 return None 54 55 56def if_mousedown(handler): 57 def handle_if_mouse_down(cli, mouse_event): 58 mouse_events = importlib.import_module('prompt_toolkit.mouse_events') 59 if mouse_event.event_type == mouse_events.MouseEventTypes.MOUSE_DOWN: 60 return handler(cli, mouse_event) 61 else: 62 return NotImplemented 63 64 return handle_if_mouse_down 65 66 67def get_deps(platform_json): 68 platform = read_json_file(platform_json) 69 subsystem_dict = {} 70 component_deps = defaultdict(list) 71 component_targets = {} 72 component_dirs = {} 73 for subsystem in platform['subsystems']: 74 subsystem_dict[subsystem['subsystem']] = [] 75 for component in subsystem['components']: 76 cname = component['component'] 77 subsystem_dict[subsystem['subsystem']].append(cname) 78 if 'components' in component['deps']: 79 deps = component['deps']['components'] 80 if cname in deps: 81 deps.remove(cname) 82 else: 83 deps = [] 84 component_deps[cname] = deps 85 component_targets[cname] = component['targets'] 86 component_dirs[cname] = [ 87 os.path.join(os.path.dirname(platform_json), os.pardir, 88 os.pardir, os.pardir, os.pardir, path) 89 for path in component['dirs'] 90 ] 91 92 return subsystem_dict, component_deps, component_targets, component_dirs 93 94 95def select_node(node, selected, nodes_from, deps): 96 queue = Queue() 97 queue.put(node) 98 nodes_from[node].append(node) 99 100 while not queue.empty(): 101 now_node = queue.get() 102 if now_node not in selected: 103 selected.append(now_node) 104 for dep in deps.get(now_node, []): 105 if now_node != dep and dep not in selected: 106 queue.put(dep) 107 nodes_from[dep].append(node) 108 109 110def deselect_node(node, selected, nodes_from, deps): 111 queue = Queue() 112 queue.put(node) 113 node_list = [] 114 115 while not queue.empty(): 116 now_node = queue.get() 117 for each_node in nodes_from[now_node]: 118 queue.put(each_node) 119 nodes_from[now_node].clear() 120 if now_node in selected: 121 selected.remove(now_node) 122 node_list.append(now_node) 123 124 [queue.put(n) for n in node_list] 125 while not queue.empty(): 126 now_node = queue.get() 127 for dep in deps.get(now_node, []): 128 if dep not in selected: 129 continue 130 nodes_from[dep] = [n for n in nodes_from[dep] if n in selected] 131 if not len(nodes_from[dep]): 132 selected.remove(dep) 133 queue.put(dep) 134 135 136def get_deps_list(comp, deps): 137 queue = Queue() 138 visited = set() 139 deps_list = [comp] 140 queue.put(comp) 141 142 while not queue.empty(): 143 node = queue.get() 144 for index, dep_comp in enumerate(deps[node]): 145 if dep_comp in visited: 146 continue 147 deps_list.append(dep_comp) 148 queue.put(dep_comp) 149 visited.add(dep_comp) 150 151 return deps_list 152 153 154def get_support_product(product_path): 155 product_dict = defaultdict(list) 156 for product in os.listdir(product_path): 157 product_json = os.path.join(product_path, product) 158 product_content = read_json_file(product_json) 159 board = product_content.get('board') 160 kernel = product_content.get('kernel') 161 platform = "{}_{}".format(board, kernel) 162 product_dict[platform].append(product.strip('.json')) 163 164 return product_dict 165 166 167def check_path(dep, path): 168 dep = dep[:-1] if dep.endswith("/") else dep 169 path = path[:-1] if path.endswith("/") else path 170 171 if len(dep) > len(path): 172 path_max = dep 173 path_min = path 174 else: 175 path_max = path 176 path_min = dep 177 178 if path_min in path_max: 179 path_sub = path_max.replace(path_min, "") 180 if path_sub == "": 181 return True 182 if path_sub.startswith('/') or path_sub.startswith(':'): 183 return True 184 return False 185 186 187class Separator(object): 188 line = '-' * 15 189 190 def __init__(self, line=None): 191 if line: 192 self.line = f'\n{line}' 193 194 def __str__(self): 195 return self.line 196