1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Nils Weiss <nils@we155.de> 5 6# scapy.contrib.description = Graph library for AutomotiveTestCaseExecutor 7# scapy.contrib.status = library 8 9from collections import defaultdict 10 11from scapy.contrib.automotive import log_automotive 12from scapy.contrib.automotive.ecu import EcuState 13 14# Typing imports 15from typing import ( 16 Union, 17 List, 18 Optional, 19 Dict, 20 Tuple, 21 Set, 22 TYPE_CHECKING, 23) 24 25_Edge = Tuple[EcuState, EcuState] 26 27if TYPE_CHECKING: 28 from scapy.contrib.automotive.scanner.test_case import _TransitionTuple 29 30 31class Graph(object): 32 """ 33 Helper object to store a directional Graph of EcuState objects. An edge in 34 this Graph is defined as Tuple of two EcuStates. A node is defined as 35 EcuState. 36 37 self.edges is a dict of all possible next nodes 38 e.g. {'X': ['A', 'B', 'C', 'E'], ...} 39 40 self.__transition_functions has all the transition_functions between 41 two nodes, with the two nodes as a tuple as the key 42 e.g. {('X', 'A'): 7, ('X', 'B'): 2, ...} 43 """ 44 def __init__(self): 45 # type: () -> None 46 self.edges = defaultdict(list) # type: Dict[EcuState, List[EcuState]] 47 self.__transition_functions = {} # type: Dict[_Edge, Optional["_TransitionTuple"]] # noqa: E501 48 self.weights = {} # type: Dict[_Edge, int] 49 50 def add_edge(self, edge, transition_function=None): 51 # type: (_Edge, Optional["_TransitionTuple"]) -> None 52 """ 53 Inserts new edge in directional graph 54 :param edge: edge from node to node 55 :param transition_function: tuple with enter and cleanup function 56 """ 57 self.edges[edge[0]].append(edge[1]) 58 self.weights[edge] = 1 59 self.__transition_functions[edge] = transition_function 60 61 def get_transition_tuple_for_edge(self, edge): 62 # type: (_Edge) -> Optional["_TransitionTuple"] # noqa: E501 63 """ 64 Returns a TransitionTuple for an Edge, if available. 65 :param edge: Tuple of EcuStates 66 :return: According TransitionTuple or None 67 """ 68 return self.__transition_functions.get(edge, None) 69 70 def downrate_edge(self, edge): 71 # type: (_Edge) -> None 72 """ 73 Increases the weight of an Edge 74 :param edge: Edge on which the weight has t obe increased 75 """ 76 try: 77 self.weights[edge] += 1 78 except KeyError: 79 pass 80 81 @property 82 def transition_functions(self): 83 # type: () -> Dict[_Edge, Optional["_TransitionTuple"]] 84 """ 85 Get the dict of all TransistionTuples 86 :return: 87 """ 88 return self.__transition_functions 89 90 @property 91 def nodes(self): 92 # type: () -> Union[List[EcuState], Set[EcuState]] 93 """ 94 Get a set of all nodes in this Graph 95 :return: 96 """ 97 return set([n for k, p in self.edges.items() for n in p + [k]]) 98 99 def render(self, filename="SystemStateGraph.gv", view=True): 100 # type: (str, bool) -> None 101 """ 102 Renders this Graph as PDF, if `graphviz` is installed. 103 104 :param filename: A filename for the rendered PDF. 105 :param view: If True, rendered file will be opened. 106 """ 107 try: 108 from graphviz import Digraph 109 except ImportError: 110 log_automotive.info("Please install graphviz.") 111 return 112 113 ps = Digraph(name="SystemStateGraph", 114 node_attr={"fillcolor": "lightgrey", 115 "style": "filled", 116 "shape": "box"}, 117 graph_attr={"concentrate": "true"}) 118 for n in self.nodes: 119 ps.node(str(n)) 120 121 for e, f in self.__transition_functions.items(): 122 try: 123 desc = "" if f is None else f[1]["desc"] 124 except (AttributeError, KeyError): 125 desc = "" 126 ps.edge(str(e[0]), str(e[1]), label=desc) 127 128 ps.render(filename, view=view) 129 130 @staticmethod 131 def dijkstra(graph, initial, end): 132 # type: (Graph, EcuState, EcuState) -> List[EcuState] 133 """ 134 Compute shortest paths from initial to end in graph 135 Partly from https://benalexkeen.com/implementing-djikstras-shortest-path-algorithm-with-python/ # noqa: E501 136 :param graph: Graph where path is computed 137 :param initial: Start node 138 :param end: End node 139 :return: A path as list of nodes 140 """ 141 shortest_paths = {initial: (None, 0)} # type: Dict[EcuState, Tuple[Optional[EcuState], int]] # noqa: E501 142 current_node = initial 143 visited = set() 144 145 while current_node != end: 146 visited.add(current_node) 147 destinations = graph.edges[current_node] 148 weight_to_current_node = shortest_paths[current_node][1] 149 150 for next_node in destinations: 151 weight = graph.weights[(current_node, next_node)] + \ 152 weight_to_current_node 153 if next_node not in shortest_paths: 154 shortest_paths[next_node] = (current_node, weight) 155 else: 156 current_shortest_weight = shortest_paths[next_node][1] 157 if current_shortest_weight > weight: 158 shortest_paths[next_node] = (current_node, weight) 159 160 next_destinations = {node: shortest_paths[node] for node in 161 shortest_paths if node not in visited} 162 if not next_destinations: 163 return [] 164 # next node is the destination with the lowest weight 165 current_node = min(next_destinations, 166 key=lambda k: next_destinations[k][1]) 167 168 # Work back through destinations in shortest path 169 last_node = shortest_paths[current_node][0] 170 path = [current_node] 171 while last_node is not None: 172 path.append(last_node) 173 last_node = shortest_paths[last_node][0] 174 # Reverse path 175 path.reverse() 176 return path 177