• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Nils Weiss <nils@we155.de>
5
6# scapy.contrib.description = Graph library for AutomotiveTestCaseExecutor
7# scapy.contrib.status = library
8
9from collections import defaultdict
10
11from scapy.contrib.automotive import log_automotive
12from scapy.contrib.automotive.ecu import EcuState
13
14# Typing imports
15from typing import (
16    Union,
17    List,
18    Optional,
19    Dict,
20    Tuple,
21    Set,
22    TYPE_CHECKING,
23)
24
25_Edge = Tuple[EcuState, EcuState]
26
27if TYPE_CHECKING:
28    from scapy.contrib.automotive.scanner.test_case import _TransitionTuple
29
30
31class Graph(object):
32    """
33    Helper object to store a directional Graph of EcuState objects. An edge in
34    this Graph is defined as Tuple of two EcuStates. A node is defined as
35    EcuState.
36
37    self.edges is a dict of all possible next nodes
38    e.g. {'X': ['A', 'B', 'C', 'E'], ...}
39
40    self.__transition_functions has all the transition_functions between
41    two nodes, with the two nodes as a tuple as the key
42    e.g. {('X', 'A'): 7, ('X', 'B'): 2, ...}
43    """
44    def __init__(self):
45        # type: () -> None
46        self.edges = defaultdict(list)  # type: Dict[EcuState, List[EcuState]]
47        self.__transition_functions = {}  # type: Dict[_Edge, Optional["_TransitionTuple"]]  # noqa: E501
48        self.weights = {}  # type: Dict[_Edge, int]
49
50    def add_edge(self, edge, transition_function=None):
51        # type: (_Edge, Optional["_TransitionTuple"]) -> None
52        """
53        Inserts new edge in directional graph
54        :param edge: edge from node to node
55        :param transition_function: tuple with enter and cleanup function
56        """
57        self.edges[edge[0]].append(edge[1])
58        self.weights[edge] = 1
59        self.__transition_functions[edge] = transition_function
60
61    def get_transition_tuple_for_edge(self, edge):
62        # type: (_Edge) -> Optional["_TransitionTuple"]  # noqa: E501
63        """
64        Returns a TransitionTuple for an Edge, if available.
65        :param edge: Tuple of EcuStates
66        :return: According TransitionTuple or None
67        """
68        return self.__transition_functions.get(edge, None)
69
70    def downrate_edge(self, edge):
71        # type: (_Edge) -> None
72        """
73        Increases the weight of an Edge
74        :param edge: Edge on which the weight has t obe increased
75        """
76        try:
77            self.weights[edge] += 1
78        except KeyError:
79            pass
80
81    @property
82    def transition_functions(self):
83        # type: () -> Dict[_Edge, Optional["_TransitionTuple"]]
84        """
85        Get the dict of all TransistionTuples
86        :return:
87        """
88        return self.__transition_functions
89
90    @property
91    def nodes(self):
92        # type: () -> Union[List[EcuState], Set[EcuState]]
93        """
94        Get a set of all nodes in this Graph
95        :return:
96        """
97        return set([n for k, p in self.edges.items() for n in p + [k]])
98
99    def render(self, filename="SystemStateGraph.gv", view=True):
100        # type: (str, bool) -> None
101        """
102        Renders this Graph as PDF, if `graphviz` is installed.
103
104        :param filename: A filename for the rendered PDF.
105        :param view: If True, rendered file will be opened.
106        """
107        try:
108            from graphviz import Digraph
109        except ImportError:
110            log_automotive.info("Please install graphviz.")
111            return
112
113        ps = Digraph(name="SystemStateGraph",
114                     node_attr={"fillcolor": "lightgrey",
115                                "style": "filled",
116                                "shape": "box"},
117                     graph_attr={"concentrate": "true"})
118        for n in self.nodes:
119            ps.node(str(n))
120
121        for e, f in self.__transition_functions.items():
122            try:
123                desc = "" if f is None else f[1]["desc"]
124            except (AttributeError, KeyError):
125                desc = ""
126            ps.edge(str(e[0]), str(e[1]), label=desc)
127
128        ps.render(filename, view=view)
129
130    @staticmethod
131    def dijkstra(graph, initial, end):
132        # type: (Graph, EcuState, EcuState) -> List[EcuState]
133        """
134        Compute shortest paths from initial to end in graph
135        Partly from https://benalexkeen.com/implementing-djikstras-shortest-path-algorithm-with-python/  # noqa: E501
136        :param graph: Graph where path is computed
137        :param initial: Start node
138        :param end: End node
139        :return: A path as list of nodes
140        """
141        shortest_paths = {initial: (None, 0)}  # type: Dict[EcuState, Tuple[Optional[EcuState], int]]  # noqa: E501
142        current_node = initial
143        visited = set()
144
145        while current_node != end:
146            visited.add(current_node)
147            destinations = graph.edges[current_node]
148            weight_to_current_node = shortest_paths[current_node][1]
149
150            for next_node in destinations:
151                weight = graph.weights[(current_node, next_node)] + \
152                    weight_to_current_node
153                if next_node not in shortest_paths:
154                    shortest_paths[next_node] = (current_node, weight)
155                else:
156                    current_shortest_weight = shortest_paths[next_node][1]
157                    if current_shortest_weight > weight:
158                        shortest_paths[next_node] = (current_node, weight)
159
160            next_destinations = {node: shortest_paths[node] for node in
161                                 shortest_paths if node not in visited}
162            if not next_destinations:
163                return []
164            # next node is the destination with the lowest weight
165            current_node = min(next_destinations,
166                               key=lambda k: next_destinations[k][1])
167
168        # Work back through destinations in shortest path
169        last_node = shortest_paths[current_node][0]
170        path = [current_node]
171        while last_node is not None:
172            path.append(last_node)
173            last_node = shortest_paths[last_node][0]
174        # Reverse path
175        path.reverse()
176        return path
177