1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Nils Weiss <nils@we155.de> 5 6# scapy.contrib.description = Staged AutomotiveTestCase base classes 7# scapy.contrib.status = library 8 9 10from scapy.contrib.automotive import log_automotive 11from scapy.contrib.automotive.scanner.graph import _Edge 12from scapy.contrib.automotive.ecu import EcuState, EcuResponse, Ecu 13from scapy.contrib.automotive.scanner.test_case import AutomotiveTestCaseABC, \ 14 TestCaseGenerator, StateGenerator, _SocketUnion 15 16# Typing imports 17from typing import ( 18 Any, 19 List, 20 Optional, 21 Dict, 22 Callable, 23 cast, 24 Tuple, 25 TYPE_CHECKING, 26) 27if TYPE_CHECKING: 28 from scapy.contrib.automotive.scanner.test_case import _TransitionTuple 29 from scapy.contrib.automotive.scanner.configuration import \ 30 AutomotiveTestCaseExecutorConfiguration 31 32# type definitions 33_TestCaseConnectorCallable = \ 34 Callable[[AutomotiveTestCaseABC, AutomotiveTestCaseABC], Dict[str, Any]] 35 36 37class StagedAutomotiveTestCase(AutomotiveTestCaseABC, TestCaseGenerator, StateGenerator): # noqa: E501 38 """ Helper object to build a pipeline of TestCases. This allows to combine 39 TestCases and to execute them after each other. Custom connector functions 40 can be used to exchange and manipulate the configuration of a subsequent 41 TestCase. 42 43 :param test_cases: A list of objects following the AutomotiveTestCaseABC 44 interface 45 :param connectors: A list of connector functions. A connector function 46 takes two TestCase objects and returns a dictionary which is provided 47 to the second TestCase as kwargs of the execute function. 48 49 50 Example: 51 >>> class MyTestCase2(AutomotiveTestCaseABC): 52 >>> pass 53 >>> 54 >>> class MyTestCase1(AutomotiveTestCaseABC): 55 >>> pass 56 >>> 57 >>> def connector(testcase1, testcase2): 58 >>> scan_range = len(testcase1.results) 59 >>> return {"verbose": True, "scan_range": scan_range} 60 >>> 61 >>> tc1 = MyTestCase1() 62 >>> tc2 = MyTestCase2() 63 >>> pipeline = StagedAutomotiveTestCase([tc1, tc2], [None, connector]) 64 """ 65 66 # Delay the increment of a stage after the current stage is finished 67 # has_completed() has to be called five times in order to increment the 68 # current stage. This ensures, that the current stage is executed for 69 # all possible states of the DUT, and no state is missed for the first 70 # TestCase. 71 __delay_stages = 5 72 73 def __init__(self, 74 test_cases, # type: List[AutomotiveTestCaseABC] 75 connectors=None # type: Optional[List[Optional[_TestCaseConnectorCallable]]] # noqa: E501 76 ): # type: (...) -> None 77 super(StagedAutomotiveTestCase, self).__init__() 78 self.__test_cases = test_cases 79 self.__connectors = connectors 80 self.__stage_index = 0 81 self.__completion_delay = 0 82 self.__current_kwargs = None # type: Optional[Dict[str, Any]] 83 84 def __getitem__(self, item): 85 # type: (int) -> AutomotiveTestCaseABC 86 return self.__test_cases[item] 87 88 def __len__(self): 89 # type: () -> int 90 return len(self.__test_cases) 91 92 # TODO: Fix unit tests and remove this function 93 def __reduce__(self): # type: ignore 94 f, t, d = super(StagedAutomotiveTestCase, self).__reduce__() # type: ignore # noqa: E501 95 try: 96 del d["_StagedAutomotiveTestCase__connectors"] 97 except KeyError: 98 pass 99 return f, t, d 100 101 @property 102 def test_cases(self): 103 # type: () -> List[AutomotiveTestCaseABC] 104 return self.__test_cases 105 106 @property 107 def current_test_case(self): 108 # type: () -> AutomotiveTestCaseABC 109 return self[self.__stage_index] 110 111 @property 112 def current_connector(self): 113 # type: () -> Optional[_TestCaseConnectorCallable] 114 if not self.__connectors: 115 return None 116 else: 117 return self.__connectors[self.__stage_index] 118 119 @property 120 def previous_test_case(self): 121 # type: () -> Optional[AutomotiveTestCaseABC] 122 return self.__test_cases[self.__stage_index - 1] if \ 123 self.__stage_index > 0 else None 124 125 def get_generated_test_case(self): 126 # type: () -> Optional[AutomotiveTestCaseABC] 127 try: 128 test_case = cast(TestCaseGenerator, self.current_test_case) 129 return test_case.get_generated_test_case() 130 except AttributeError: 131 return None 132 133 def get_new_edge(self, 134 socket, # type: _SocketUnion 135 config # type: AutomotiveTestCaseExecutorConfiguration 136 ): # type: (...) -> Optional[_Edge] 137 try: 138 test_case = cast(StateGenerator, self.current_test_case) 139 return test_case.get_new_edge(socket, config) 140 except AttributeError: 141 return None 142 143 def get_transition_function(self, socket, edge): 144 # type: (_SocketUnion, _Edge) -> Optional[_TransitionTuple] 145 try: 146 test_case = cast(StateGenerator, self.current_test_case) 147 return test_case.get_transition_function(socket, edge) 148 except AttributeError: 149 return None 150 151 def has_completed(self, state): 152 # type: (EcuState) -> bool 153 if not (self.current_test_case.has_completed(state) and 154 self.current_test_case.completed): 155 # current test_case not fully completed 156 # reset completion delay, since new states could have been appeared 157 self.__completion_delay = 0 158 return False 159 160 # current stage is finished. We have to increase the stage 161 if self.__completion_delay < StagedAutomotiveTestCase.__delay_stages: 162 # First we wait five more iteration of the executor 163 # Maybe one more execution reveals new states of other 164 # test_cases 165 self.__completion_delay += 1 166 return False 167 168 # current test_case is fully completed 169 elif self.__stage_index == len(self.__test_cases) - 1: 170 # this test_case was the last test_case... nothing to do 171 return True 172 173 else: 174 # We waited more iterations and no new state appeared, 175 # let's enter the next stage 176 log_automotive.info( 177 "Staged AutomotiveTestCase %s completed", 178 self.current_test_case.__class__.__name__) 179 self.__stage_index += 1 180 self.__completion_delay = 0 181 return False 182 183 def pre_execute(self, 184 socket, # type: _SocketUnion 185 state, # type: EcuState 186 global_configuration # type: AutomotiveTestCaseExecutorConfiguration # noqa: E501 187 ): # type: (...) -> None 188 test_case_cls = self.current_test_case.__class__ 189 try: 190 self.__current_kwargs = global_configuration[ 191 test_case_cls.__name__] 192 except KeyError: 193 self.__current_kwargs = dict() 194 global_configuration[test_case_cls.__name__] = \ 195 self.__current_kwargs 196 197 if callable(self.current_connector) and self.__stage_index > 0: 198 if self.previous_test_case: 199 con = self.current_connector # type: _TestCaseConnectorCallable # noqa: E501 200 con_kwargs = con(self.previous_test_case, 201 self.current_test_case) 202 if self.__current_kwargs is not None and con_kwargs is not None: # noqa: E501 203 self.__current_kwargs.update(con_kwargs) 204 205 log_automotive.debug("Stage AutomotiveTestCase %s kwargs: %s", 206 self.current_test_case.__class__.__name__, 207 self.__current_kwargs) 208 209 self.current_test_case.pre_execute(socket, state, global_configuration) 210 211 def execute(self, socket, state, **kwargs): 212 # type: (_SocketUnion, EcuState, Any) -> None 213 kwargs.update(self.__current_kwargs or dict()) 214 self.current_test_case.execute(socket, state, **kwargs) 215 216 def post_execute(self, 217 socket, # type: _SocketUnion 218 state, # type: EcuState 219 global_configuration # type: AutomotiveTestCaseExecutorConfiguration # noqa: E501 220 ): # type: (...) -> None 221 self.current_test_case.post_execute( 222 socket, state, global_configuration) 223 224 @staticmethod 225 def _show_headline(headline, sep="="): 226 # type: (str, str) -> str 227 s = "\n\n" + sep * (len(headline) + 10) + "\n" 228 s += " " * 5 + headline + "\n" 229 s += sep * (len(headline) + 10) + "\n" 230 return s + "\n" 231 232 def show(self, dump=False, filtered=True, verbose=False): 233 # type: (bool, bool, bool) -> Optional[str] 234 s = self._show_headline("AutomotiveTestCase Pipeline", "=") 235 for idx, t in enumerate(self.__test_cases): 236 s += self._show_headline( 237 "AutomotiveTestCase Stage %d" % idx, "-") 238 s += t.show(True, filtered, verbose) or "" 239 240 if dump: 241 return s + "\n" 242 else: 243 print(s) 244 return None 245 246 @property 247 def completed(self): 248 # type: () -> bool 249 return all(e.completed for e in self.__test_cases) and \ 250 self.__completion_delay >= StagedAutomotiveTestCase.__delay_stages 251 252 @property 253 def supported_responses(self): 254 # type: () -> List[EcuResponse] 255 supported_responses = list() 256 for tc in self.test_cases: 257 supported_responses += tc.supported_responses 258 259 supported_responses.sort(key=Ecu.sort_key_func) 260 return supported_responses 261 262 def runtime_estimation(self): 263 # type: () -> Optional[Tuple[int, int, float]] 264 265 if hasattr(self.current_test_case, "runtime_estimation"): 266 cur_est = self.current_test_case.runtime_estimation() 267 if cur_est: 268 return len(self.test_cases), \ 269 self.__stage_index, \ 270 float(self.__stage_index) / len(self.test_cases) + \ 271 cur_est[2] / len(self.test_cases) 272 273 return len(self.test_cases), \ 274 self.__stage_index, \ 275 float(self.__stage_index) / len(self.test_cases) 276