1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Nils Weiss <nils@we155.de> 5 6# scapy.contrib.description = AutomotiveTestCaseExecutor base class 7# scapy.contrib.status = library 8 9import abc 10import time 11 12from itertools import product 13 14from scapy.contrib.automotive import log_automotive 15from scapy.contrib.automotive.scanner.graph import Graph 16from scapy.error import Scapy_Exception 17from scapy.supersocket import SuperSocket 18from scapy.utils import make_lined_table, SingleConversationSocket 19from scapy.contrib.automotive.ecu import EcuState, EcuResponse, Ecu 20from scapy.contrib.automotive.scanner.configuration import \ 21 AutomotiveTestCaseExecutorConfiguration 22from scapy.contrib.automotive.scanner.test_case import AutomotiveTestCaseABC, \ 23 _SocketUnion, _CleanupCallable, StateGenerator, TestCaseGenerator, \ 24 AutomotiveTestCase 25 26# Typing imports 27from typing import ( 28 Any, 29 Union, 30 List, 31 Optional, 32 Dict, 33 Callable, 34 Type, 35 cast, 36 TypeVar, 37) 38 39T = TypeVar("T") 40 41 42class AutomotiveTestCaseExecutor(metaclass=abc.ABCMeta): 43 """ 44 Base class for different automotive scanners. This class handles 45 the connection to a scan target, ensures the execution of all it's 46 test cases, and stores the system state machine 47 48 49 :param socket: A socket object to communicate with the scan target 50 :param reset_handler: A function to reset the scan target 51 :param reconnect_handler: In case the communication needs to be 52 established after a reset, provide a 53 reconnect function which returns a socket object 54 :param test_cases: A list of TestCase instances or classes 55 :param kwargs: Arguments for the internal 56 AutomotiveTestCaseExecutorConfiguration instance 57 """ 58 59 @property 60 def _initial_ecu_state(self): 61 # type: () -> EcuState 62 return EcuState(session=1) 63 64 def __init__( 65 self, 66 socket, # type: Optional[_SocketUnion] 67 reset_handler=None, # type: Optional[Callable[[], None]] 68 reconnect_handler=None, # type: Optional[Callable[[], _SocketUnion]] # noqa: E501 69 test_cases=None, 70 # type: Optional[List[Union[AutomotiveTestCaseABC, Type[AutomotiveTestCaseABC]]]] # noqa: E501 71 **kwargs # type: Optional[Dict[str, Any]] 72 ): # type: (...) -> None 73 74 # The TesterPresentSender can interfere with a test_case, since a 75 # target may only allow one request at a time. 76 # The SingleConversationSocket prevents interleaving requests. 77 if socket and not isinstance(socket, SingleConversationSocket): 78 self.socket = SingleConversationSocket(socket) # type: Optional[_SocketUnion] # noqa: E501 79 else: 80 self.socket = socket 81 82 self.target_state = self._initial_ecu_state 83 self.reset_handler = reset_handler 84 self.reconnect_handler = reconnect_handler 85 86 self.cleanup_functions = list() # type: List[_CleanupCallable] 87 88 self.configuration = AutomotiveTestCaseExecutorConfiguration( 89 test_cases or self.default_test_case_clss, **kwargs) 90 self.validate_test_case_kwargs() 91 92 def __reduce__(self): # type: ignore 93 f, t, d = super(AutomotiveTestCaseExecutor, self).__reduce__() # type: ignore # noqa: E501 94 try: 95 del d["socket"] 96 except KeyError: 97 pass 98 try: 99 del d["reset_handler"] 100 except KeyError: 101 pass 102 try: 103 del d["reconnect_handler"] 104 except KeyError: 105 pass 106 return f, t, d 107 108 @property 109 @abc.abstractmethod 110 def default_test_case_clss(self): 111 # type: () -> List[Type[AutomotiveTestCaseABC]] 112 raise NotImplementedError() 113 114 @property 115 def state_graph(self): 116 # type: () -> Graph 117 return self.configuration.state_graph 118 119 @property 120 def state_paths(self): 121 # type: () -> List[List[EcuState]] 122 """ 123 Returns all state paths. A path is represented by a list of EcuState 124 objects. 125 :return: A list of paths. 126 """ 127 paths = [Graph.dijkstra(self.state_graph, self._initial_ecu_state, s) 128 for s in self.state_graph.nodes 129 if s != self._initial_ecu_state] 130 return sorted( 131 [p for p in paths if p] + [[self._initial_ecu_state]], 132 key=lambda x: x[-1]) 133 134 @property 135 def final_states(self): 136 # type: () -> List[EcuState] 137 """ 138 Returns a list with all final states. A final state is the last 139 state of a path. 140 :return: 141 """ 142 return [p[-1] for p in self.state_paths] 143 144 @property 145 def scan_completed(self): 146 # type: () -> bool 147 return all(t.has_completed(s) for t, s in 148 product(self.configuration.test_cases, self.final_states)) 149 150 def reset_target(self): 151 # type: () -> None 152 log_automotive.info("Target reset") 153 if self.reset_handler: 154 self.reset_handler() 155 self.target_state = self._initial_ecu_state 156 157 def reconnect(self): 158 # type: () -> None 159 if self.reconnect_handler: 160 try: 161 if self.socket: 162 self.socket.close() 163 except Exception as e: 164 log_automotive.exception( 165 "Exception '%s' during socket.close", e) 166 167 log_automotive.info("Target reconnect") 168 socket = self.reconnect_handler() 169 if not isinstance(socket, SingleConversationSocket): 170 self.socket = SingleConversationSocket(socket) 171 else: 172 self.socket = socket 173 174 if self.socket and self.socket.closed: 175 raise Scapy_Exception( 176 "Socket closed even after reconnect. Stop scan!") 177 178 def execute_test_case(self, test_case, kill_time=None): 179 # type: (AutomotiveTestCaseABC, Optional[float]) -> None 180 """ 181 This function ensures the correct execution of a testcase, including 182 the pre_execute, execute and post_execute. 183 Finally, the testcase is asked if a new edge or a new testcase was 184 generated. 185 186 :param test_case: A test case to be executed 187 :param kill_time: If set, this defines the maximum execution time for 188 the current test_case 189 :return: None 190 """ 191 192 if not self.socket: 193 log_automotive.warning("Socket is None! Leaving execute_test_case") 194 return 195 196 test_case.pre_execute( 197 self.socket, self.target_state, self.configuration) 198 199 try: 200 test_case_kwargs = self.configuration[test_case.__class__.__name__] 201 except KeyError: 202 test_case_kwargs = dict() 203 204 if kill_time: 205 max_execution_time = max(int(kill_time - time.monotonic()), 5) 206 cur_execution_time = test_case_kwargs.get("execution_time", 1200) 207 test_case_kwargs["execution_time"] = min(max_execution_time, 208 cur_execution_time) 209 210 log_automotive.debug("Execute test_case %s with args %s", 211 test_case.__class__.__name__, test_case_kwargs) 212 213 test_case.execute(self.socket, self.target_state, **test_case_kwargs) 214 test_case.post_execute( 215 self.socket, self.target_state, self.configuration) 216 217 self.check_new_states(test_case) 218 self.check_new_testcases(test_case) 219 220 if hasattr(test_case, "runtime_estimation"): 221 estimation = test_case.runtime_estimation() 222 if estimation is not None: 223 log_automotive.debug( 224 "[i] Test_case %s: TODO %d, " 225 "DONE %d, TOTAL %0.2f", 226 test_case.__class__.__name__, estimation[0], 227 estimation[1], estimation[2]) 228 229 def check_new_testcases(self, test_case): 230 # type: (AutomotiveTestCaseABC) -> None 231 if isinstance(test_case, TestCaseGenerator): 232 new_test_case = test_case.get_generated_test_case() 233 if new_test_case: 234 log_automotive.debug("Testcase generated %s", new_test_case) 235 self.configuration.add_test_case(new_test_case) 236 237 def check_new_states(self, test_case): 238 # type: (AutomotiveTestCaseABC) -> None 239 if not self.socket: 240 log_automotive.warning("Socket is None! Leaving check_new_states") 241 return 242 243 if isinstance(test_case, StateGenerator): 244 edge = test_case.get_new_edge(self.socket, self.configuration) 245 if edge: 246 log_automotive.debug("Edge found %s", edge) 247 tf = test_case.get_transition_function(self.socket, edge) 248 self.state_graph.add_edge(edge, tf) 249 250 def validate_test_case_kwargs(self): 251 # type: () -> None 252 for test_case in self.configuration.test_cases: 253 if isinstance(test_case, AutomotiveTestCase): 254 test_case_kwargs = self.configuration[test_case.__class__.__name__] 255 test_case.check_kwargs(test_case_kwargs) 256 257 def stop_scan(self): 258 # type: () -> None 259 self.configuration.stop_event.set() 260 log_automotive.debug("Internal stop event set!") 261 262 def progress(self): 263 # type: () -> float 264 progress = [] 265 for tc in self.configuration.test_cases: 266 if not hasattr(tc, "runtime_estimation"): 267 continue 268 est = tc.runtime_estimation() 269 if est is None: 270 continue 271 progress.append(est[2]) 272 273 return sum(progress) / len(progress) if len(progress) else 0.0 274 275 def scan(self, timeout=None): 276 # type: (Optional[int]) -> None 277 """ 278 Executes all testcases for a given time. 279 :param timeout: Time for execution. 280 :return: None 281 """ 282 self.configuration.stop_event.clear() 283 if timeout is None: 284 kill_time = None 285 else: 286 kill_time = time.monotonic() + timeout 287 while kill_time is None or kill_time > time.monotonic(): 288 test_case_executed = False 289 log_automotive.info("[i] Scan progress %0.2f", self.progress()) 290 log_automotive.debug("[i] Scan paths %s", self.state_paths) 291 for p, test_case in product( 292 self.state_paths, self.configuration.test_cases): 293 log_automotive.info("Scan path %s", p) 294 terminate = kill_time and kill_time <= time.monotonic() 295 if terminate or self.configuration.stop_event.is_set(): 296 log_automotive.debug( 297 "Execution time exceeded. Terminating scan!") 298 break 299 300 final_state = p[-1] 301 if test_case.has_completed(final_state): 302 log_automotive.debug("State %s for %s completed", 303 repr(final_state), test_case) 304 continue 305 306 try: 307 if not self.enter_state_path(p): 308 log_automotive.error( 309 "Error entering path %s", p) 310 continue 311 log_automotive.info( 312 "Execute %s for path %s", str(test_case), p) 313 self.execute_test_case(test_case, kill_time) 314 test_case_executed = True 315 except (OSError, ValueError, Scapy_Exception) as e: 316 log_automotive.exception("Exception: %s", e) 317 if self.configuration.debug: 318 raise e 319 if isinstance(e, OSError): 320 log_automotive.exception( 321 "OSError occurred, closing socket") 322 if self.socket: 323 self.socket.close() 324 if (self.socket 325 and cast(SuperSocket, self.socket).closed 326 and self.reconnect_handler is None): 327 log_automotive.critical( 328 "Socket went down. Need to leave scan") 329 raise e 330 finally: 331 self.cleanup_state() 332 333 if not test_case_executed: 334 log_automotive.info( 335 "Execute failure or scan completed. Exit scan!") 336 break 337 338 self.cleanup_state() 339 self.reset_target() 340 341 def enter_state_path(self, path): 342 # type: (List[EcuState]) -> bool 343 """ 344 Resets and reconnects to a target and applies all transition functions 345 to traversal a given path. 346 :param path: Path to be applied to the scan target. 347 :return: True, if all transition functions could be executed. 348 """ 349 if path[0] != self._initial_ecu_state: 350 raise Scapy_Exception( 351 "Initial state of path not equal reset state of the target") 352 353 self.reset_target() 354 self.reconnect() 355 356 if len(path) == 1: 357 return True 358 359 for next_state in path[1:]: 360 if self.configuration.stop_event.is_set(): 361 self.cleanup_state() 362 return False 363 364 edge = (self.target_state, next_state) 365 self.configuration.stop_event.wait( 366 timeout=self.configuration.delay_enter_state) 367 if not self.enter_state(*edge): 368 self.state_graph.downrate_edge(edge) 369 self.cleanup_state() 370 return False 371 return True 372 373 def enter_state(self, prev_state, next_state): 374 # type: (EcuState, EcuState) -> bool 375 """ 376 Obtains a transition function from the system state graph and executes 377 it. On success, the cleanup function is added for a later cleanup of 378 the new state. 379 :param prev_state: Current state 380 :param next_state: Desired state 381 :return: True, if state could be changed successful 382 """ 383 if not self.socket: 384 log_automotive.warning("Socket is None! Leaving enter_state") 385 return False 386 387 edge = (prev_state, next_state) 388 funcs = self.state_graph.get_transition_tuple_for_edge(edge) 389 390 if funcs is None: 391 log_automotive.error("No transition function for %s", edge) 392 return False 393 394 trans_func, trans_kwargs, clean_func = funcs 395 state_changed = trans_func( 396 self.socket, self.configuration, trans_kwargs) 397 if state_changed: 398 self.target_state = next_state 399 400 if clean_func is not None: 401 self.cleanup_functions += [clean_func] 402 return True 403 else: 404 log_automotive.info("Transition for edge %s failed", edge) 405 return False 406 407 def cleanup_state(self): 408 # type: () -> None 409 """ 410 Executes all collected cleanup functions from a traversed path 411 :return: None 412 """ 413 if not self.socket: 414 log_automotive.warning("Socket is None! Leaving cleanup_state") 415 return 416 417 for f in self.cleanup_functions: 418 if not callable(f): 419 continue 420 try: 421 if not f(self.socket, self.configuration): 422 log_automotive.info( 423 "Cleanup function %s failed", repr(f)) 424 except (OSError, ValueError, Scapy_Exception) as e: 425 log_automotive.critical("Exception during cleanup: %s", e) 426 427 self.cleanup_functions = list() 428 429 def show_testcases(self): 430 # type: () -> None 431 for t in self.configuration.test_cases: 432 t.show() 433 434 def show_testcases_status(self): 435 # type: () -> None 436 data = list() 437 for t in self.configuration.test_cases: 438 for s in self.state_graph.nodes: 439 data += [(repr(s), t.__class__.__name__, t.has_completed(s))] 440 make_lined_table(data, lambda *tup: (tup[0], tup[1], tup[2])) 441 442 def get_test_cases_by_class(self, cls): 443 # type: (Type[T]) -> List[T] 444 return [x for x in self.configuration.test_cases if isinstance(x, cls)] 445 446 @property 447 def supported_responses(self): 448 # type: () -> List[EcuResponse] 449 """ 450 Returns a sorted list of supported responses, gathered from all 451 enumerators. The sort is done in a way 452 to provide the best possible results, if this list of supported 453 responses is used to simulate an real world Ecu with the 454 EcuAnsweringMachine object. 455 :return: A sorted list of EcuResponse objects 456 """ 457 supported_responses = list() 458 for tc in self.configuration.test_cases: 459 supported_responses += tc.supported_responses 460 461 supported_responses.sort(key=Ecu.sort_key_func) 462 return supported_responses 463