1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Nils Weiss <nils@we155.de> 5 6# scapy.contrib.description = TestCase base class definitions 7# scapy.contrib.status = library 8 9 10import abc 11from collections import defaultdict 12 13from scapy.utils import make_lined_table, SingleConversationSocket 14from scapy.supersocket import SuperSocket 15from scapy.contrib.automotive.scanner.graph import _Edge 16from scapy.contrib.automotive.ecu import EcuState, EcuResponse 17from scapy.error import Scapy_Exception 18 19 20# Typing imports 21from typing import ( 22 Any, 23 Union, 24 List, 25 Optional, 26 Dict, 27 Tuple, 28 Set, 29 Callable, 30 TYPE_CHECKING, 31) 32if TYPE_CHECKING: 33 from scapy.contrib.automotive.scanner.configuration import AutomotiveTestCaseExecutorConfiguration # noqa: E501 34 35 36# type definitions 37_SocketUnion = Union[SuperSocket, SingleConversationSocket] 38_TransitionCallable = Callable[[_SocketUnion, "AutomotiveTestCaseExecutorConfiguration", Dict[str, Any]], bool] # noqa: E501 39_CleanupCallable = Callable[[_SocketUnion, "AutomotiveTestCaseExecutorConfiguration"], bool] # noqa: E501 40_TransitionTuple = Tuple[_TransitionCallable, Dict[str, Any], Optional[_CleanupCallable]] # noqa: E501 41 42 43class AutomotiveTestCaseABC(metaclass=abc.ABCMeta): 44 """ 45 Base class for "TestCase" objects. In automotive scanners, these TestCase 46 objects are used for individual tasks, for example enumerating over one 47 kind of functionality of the protocol. It is also possible, that 48 these TestCase objects execute complex tests on an ECU. 49 The TestCaseExecuter object has a list of TestCases. The executer 50 manipulates a device under test (DUT), to enter a certain state. In this 51 state, the TestCase object gets executed. 52 """ 53 54 _supported_kwargs = {} # type: Dict[str, Tuple[Any, Optional[Callable[[Any], bool]]]] # noqa: E501 55 _supported_kwargs_doc = "" 56 57 @abc.abstractmethod 58 def has_completed(self, state): 59 # type: (EcuState) -> bool 60 """ 61 Tells if this TestCase was executed for a certain state 62 :param state: State of interest 63 :return: True, if TestCase was executed in the questioned state 64 """ 65 raise NotImplementedError() 66 67 @abc.abstractmethod 68 def pre_execute(self, 69 socket, # type: _SocketUnion 70 state, # type: EcuState 71 global_configuration # type: AutomotiveTestCaseExecutorConfiguration # noqa: E501 72 ): # type: (...) -> None 73 """ 74 Will be executed previously to ``execute``. This function can be used 75 to manipulate the configuration passed to execute. 76 77 :param socket: Socket object with the connection to a DUT 78 :param state: Current state of the DUT 79 :param global_configuration: Configuration of the TestCaseExecutor 80 """ 81 raise NotImplementedError() 82 83 @abc.abstractmethod 84 def execute(self, socket, state, **kwargs): 85 # type: (_SocketUnion, EcuState, Any) -> None 86 """ 87 Executes this TestCase for a given state 88 89 :param socket: Socket object with the connection to a DUT 90 :param state: Current state of the DUT 91 :param kwargs: Local configuration of the TestCasesExecutor 92 :return: 93 """ 94 raise NotImplementedError() 95 96 @abc.abstractmethod 97 def post_execute(self, 98 socket, # type: _SocketUnion 99 state, # type: EcuState 100 global_configuration # type: AutomotiveTestCaseExecutorConfiguration # noqa: E501 101 ): # type: (...) -> None 102 """ 103 Will be executed subsequently to ``execute``. This function can be used 104 for additional evaluations after the ``execute``. 105 106 :param socket: Socket object with the connection to a DUT 107 :param state: Current state of the DUT 108 :param global_configuration: Configuration of the TestCaseExecutor 109 """ 110 raise NotImplementedError() 111 112 @abc.abstractmethod 113 def show(self, dump=False, filtered=True, verbose=False): 114 # type: (bool, bool, bool) -> Optional[str] 115 """ 116 Shows results of TestCase 117 118 :param dump: If True, the results will be returned; If False, the 119 results will be printed. 120 :param filtered: If True, the negative responses will be filtered 121 dynamically. 122 :param verbose: If True, additional information will be provided. 123 :return: test results of TestCase if parameter ``dump`` is True, 124 else ``None`` 125 """ 126 raise NotImplementedError() 127 128 @property 129 @abc.abstractmethod 130 def completed(self): 131 # type: () -> bool 132 """ 133 Tells if this TestCase is completely executed 134 :return: True, if TestCase is completely executed 135 """ 136 raise NotImplementedError 137 138 @property 139 @abc.abstractmethod 140 def supported_responses(self): 141 # type: () -> List[EcuResponse] 142 """ 143 Tells the supported responses in TestCase 144 :return: The list of supported responses 145 """ 146 raise NotImplementedError 147 148 149class AutomotiveTestCase(AutomotiveTestCaseABC): 150 """ Base class for TestCases""" 151 152 _description = "AutomotiveTestCase" 153 _supported_kwargs = AutomotiveTestCaseABC._supported_kwargs 154 _supported_kwargs_doc = AutomotiveTestCaseABC._supported_kwargs_doc 155 156 def __init__(self): 157 # type: () -> None 158 self._state_completed = defaultdict(bool) # type: Dict[EcuState, bool] 159 160 def has_completed(self, state): 161 # type: (EcuState) -> bool 162 return self._state_completed[state] 163 164 @classmethod 165 def check_kwargs(cls, kwargs): 166 # type: (Dict[str, Any]) -> None 167 for k, v in kwargs.items(): 168 if k not in cls._supported_kwargs.keys(): 169 raise Scapy_Exception( 170 "Keyword-Argument %s not supported for %s" % 171 (k, cls.__name__)) 172 ti, vf = cls._supported_kwargs[k] 173 if ti is not None and not isinstance(v, ti): 174 raise Scapy_Exception( 175 "Keyword-Value '%s' is not instance of type %s" % 176 (k, str(ti))) 177 if vf is not None and not vf(v): 178 raise Scapy_Exception( 179 "Validation Error: '%s: %s' is not in the allowed " 180 "value range" % (k, str(v)) 181 ) 182 183 @property 184 def completed(self): 185 # type: () -> bool 186 return all(v for _, v in self._state_completed.items()) 187 188 @property 189 def scanned_states(self): 190 # type: () -> Set[EcuState] 191 """ 192 Helper function to get all scanned states 193 :return: all scanned states 194 """ 195 return set(self._state_completed.keys()) 196 197 def pre_execute(self, socket, state, global_configuration): 198 # type: (_SocketUnion, EcuState, AutomotiveTestCaseExecutorConfiguration) -> None # noqa: E501 199 pass 200 201 def execute(self, socket, state, **kwargs): 202 # type: (_SocketUnion, EcuState, Any) -> None 203 raise NotImplementedError() 204 205 def post_execute(self, socket, state, global_configuration): 206 # type: (_SocketUnion, EcuState, AutomotiveTestCaseExecutorConfiguration) -> None # noqa: E501 207 pass 208 209 def _show_header(self, **kwargs): 210 # type: (Any) -> str 211 s = "\n\n" + "=" * (len(self._description) + 10) + "\n" 212 s += " " * 5 + self._description + "\n" 213 s += "-" * (len(self._description) + 10) + "\n" 214 215 return s + "\n" 216 217 def _show_state_information(self, **kwargs): 218 # type: (Any) -> str 219 completed = [(state, self._state_completed[state]) 220 for state in self.scanned_states] 221 return make_lined_table( 222 completed, lambda x, y: ("Scan state completed", x, y), 223 dump=True) or "" 224 225 def show(self, dump=False, filtered=True, verbose=False): 226 # type: (bool, bool, bool) -> Optional[str] 227 228 s = self._show_header() 229 230 if verbose: 231 s += self._show_state_information() 232 233 if dump: 234 return s + "\n" 235 else: 236 print(s) 237 return None 238 239 240class TestCaseGenerator(metaclass=abc.ABCMeta): 241 @abc.abstractmethod 242 def get_generated_test_case(self): 243 # type: () -> Optional[AutomotiveTestCaseABC] 244 raise NotImplementedError() 245 246 247class StateGenerator(metaclass=abc.ABCMeta): 248 249 @abc.abstractmethod 250 def get_new_edge(self, socket, config): 251 # type: (_SocketUnion, AutomotiveTestCaseExecutorConfiguration) -> Optional[_Edge] # noqa: E501 252 raise NotImplementedError 253 254 @abc.abstractmethod 255 def get_transition_function(self, socket, edge): 256 # type: (_SocketUnion, _Edge) -> Optional[_TransitionTuple] 257 """ 258 259 :param socket: Socket to target 260 :param edge: Tuple of EcuState objects for the requested 261 transition function 262 :return: Returns an optional tuple consisting of a transition function, 263 a keyword arguments dictionary for the transition function 264 and a cleanup function. Both functions 265 take a Socket and the TestCaseExecutor configuration as 266 arguments and return True if the execution was successful. 267 The first function is the state enter function, the second 268 function is a cleanup function 269 """ 270 raise NotImplementedError 271