# SPDX-License-Identifier: GPL-2.0-only # This file is part of Scapy # See https://scapy.net/ for more information # Copyright (C) Nils Weiss # scapy.contrib.description = ServiceEnumerator definitions # scapy.contrib.status = library import abc import threading import time import copy from collections import defaultdict, OrderedDict from itertools import chain from typing import NamedTuple from scapy.compat import orb from scapy.contrib.automotive import log_automotive from scapy.error import Scapy_Exception from scapy.utils import make_lined_table, EDecimal, PeriodicSenderThread from scapy.packet import Packet from scapy.contrib.automotive.ecu import EcuState, EcuResponse from scapy.contrib.automotive.scanner.test_case import AutomotiveTestCase, \ StateGenerator, _SocketUnion, _TransitionTuple from scapy.contrib.automotive.scanner.configuration import \ AutomotiveTestCaseExecutorConfiguration from scapy.contrib.automotive.scanner.graph import _Edge # Typing imports from typing import ( Any, Union, List, Optional, Iterable, Dict, Tuple, Set, Callable, cast, ) # Definition outside the class ServiceEnumerator to allow pickling _AutomotiveTestCaseScanResult = NamedTuple( "_AutomotiveTestCaseScanResult", [("state", EcuState), ("req", Packet), ("resp", Optional[Packet]), ("req_ts", Union[EDecimal, float]), ("resp_ts", Optional[Union[EDecimal, float]])]) _AutomotiveTestCaseFilteredScanResult = NamedTuple( "_AutomotiveTestCaseFilteredScanResult", [("state", EcuState), ("req", Packet), ("resp", Packet), ("req_ts", Union[EDecimal, float]), ("resp_ts", Union[EDecimal, float])]) class ServiceEnumerator(AutomotiveTestCase, metaclass=abc.ABCMeta): """ Base class for ServiceEnumerators of automotive diagnostic protocols """ _supported_kwargs = copy.copy(AutomotiveTestCase._supported_kwargs) _supported_kwargs.update({ 'timeout': ((int, float), lambda x: x > 0), 'count': (int, lambda x: x >= 0), 'execution_time': (int, None), 'state_allow_list': ((list, EcuState), None), 'state_block_list': ((list, EcuState), None), 'retry_if_none_received': (bool, None), 'exit_if_no_answer_received': (bool, None), 'exit_if_service_not_supported': (bool, None), 'exit_scan_on_first_negative_response': (bool, None), 'retry_if_busy_returncode': (bool, None), 'stop_event': (threading.Event, None), 'debug': (bool, None), 'scan_range': ((list, tuple, range), None), 'unittest': (bool, None), 'disable_tps_while_sending': (bool, None), 'inter': ((int, float), lambda x: x >= 0), }) _supported_kwargs_doc = AutomotiveTestCase._supported_kwargs_doc + """ :param timeout: Timeout until a response will arrive after a request :type timeout: integer or float :param integer count: Number of request to be sent in one execution :param int execution_time: Time in seconds until the execution of this enumerator is stopped. :param state_allow_list: List of EcuState objects or EcuState object in which the the execution of this enumerator is allowed. If provided, other states will not be executed. :type state_allow_list: EcuState or list :param state_block_list: List of EcuState objects or EcuState object in which the the execution of this enumerator is blocked. :type state_block_list: EcuState or list :param bool retry_if_none_received: Specifies if a request will be send again, if None was received (usually because of a timeout). :param bool exit_if_no_answer_received: Specifies to finish the execution of this enumerator once None is received. :param bool exit_if_service_not_supported: Specifies to finish the execution of this enumerator, once the negative return code 'serviceNotSupported' is received. :param bool exit_scan_on_first_negative_response: Specifies to finish the execution once a negative response is received. :param bool retry_if_busy_returncode: Specifies to retry a request, if the 'busyRepeatRequest' negative response code is received. :param bool debug: Enables debug functions during execute. :param Event stop_event: Signals immediate stop of the execution. :param scan_range: Specifies the identifiers to be scanned. :type scan_range: list or tuple or range or iterable :param disable_tps_while_sending: Temporary disables a TesterPresentSender to not interact with a seed request. :type disable_tps_while_sending: bool :param inter: delay between two packets during sending :type inter: int or float""" def __init__(self): # type: () -> None super(ServiceEnumerator, self).__init__() self._result_packets = OrderedDict() # type: Dict[bytes, Packet] self._results = list() # type: List[_AutomotiveTestCaseScanResult] self._request_iterators = dict() # type: Dict[EcuState, Iterable[Packet]] # noqa: E501 self._retry_pkt = defaultdict(list) # type: Dict[EcuState, Union[Packet, Iterable[Packet]]] # noqa: E501 self._negative_response_blacklist = [0x10, 0x11] # type: List[int] self._requests_per_state_estimated = None # type: Optional[int] self._tester_present_sender = None # type: Optional[PeriodicSenderThread] @staticmethod @abc.abstractmethod def _get_negative_response_code(resp): # type: (Packet) -> int raise NotImplementedError() @staticmethod @abc.abstractmethod def _get_negative_response_desc(nrc): # type: (int) -> str raise NotImplementedError() def _get_table_entry_x(self, tup): # type: (_AutomotiveTestCaseScanResult) -> str """ Provides a table entry for the column which gets print during `show()`. :param tup: A results tuple :return: A string which describes the state """ return str(tup[0]) def _get_table_entry_y(self, tup): # type: (_AutomotiveTestCaseScanResult) -> str """ Provides a table entry for the line which gets print during `show()`. :param tup: A results tuple :return: A string which describes the request """ return repr(tup[1]) def _get_table_entry_z(self, tup): # type: (_AutomotiveTestCaseScanResult) -> str """ Provides a table entry for the field which gets print during `show()`. :param tup: A results tuple :return: A string which describes the response """ return repr(tup[2]) @staticmethod @abc.abstractmethod def _get_negative_response_label(response): # type: (Packet) -> str raise NotImplementedError() @abc.abstractmethod def _get_initial_requests(self, **kwargs): # type: (Any) -> Iterable[Packet] raise NotImplementedError("Overwrite this method") def __reduce__(self): # type: ignore f, t, d = super(ServiceEnumerator, self).__reduce__() # type: ignore try: for k, v in d["_request_iterators"].items(): d["_request_iterators"][k] = list(v) except KeyError: pass try: for k in d["_retry_pkt"]: d["_retry_pkt"][k] = list(self._get_retry_iterator(k)) except KeyError: pass return f, t, d @property def negative_response_blacklist(self): # type: () -> List[int] return self._negative_response_blacklist @property def completed(self): # type: () -> bool if len(self._results): return all([self.has_completed(s) for s in self.scanned_states]) else: return super(ServiceEnumerator, self).completed def _store_result(self, state, req, res): # type: (EcuState, Packet, Optional[Packet]) -> None if bytes(req) not in self._result_packets: self._result_packets[bytes(req)] = req if res and bytes(res) not in self._result_packets: self._result_packets[bytes(res)] = res self._results.append(_AutomotiveTestCaseScanResult( state, self._result_packets[bytes(req)], self._result_packets[bytes(res)] if res is not None else None, req.sent_time or 0.0, res.time if res is not None else None)) def _get_retry_iterator(self, state): # type: (EcuState) -> Iterable[Packet] retry_entry = self._retry_pkt[state] if isinstance(retry_entry, Packet): log_automotive.debug("Provide retry packet") return [retry_entry] elif isinstance(retry_entry, list): if len(retry_entry): log_automotive.debug("Provide retry list") else: log_automotive.debug("Provide retry iterator") # assume self.retry_pkt is a generator or list return retry_entry def _get_initial_request_iterator(self, state, **kwargs): # type: (EcuState, Any) -> Iterable[Packet] if state not in self._request_iterators: self._request_iterators[state] = iter( self._get_initial_requests(**kwargs)) return self._request_iterators[state] def _get_request_iterator(self, state, **kwargs): # type: (EcuState, Optional[Dict[str, Any]]) -> Iterable[Packet] return chain(self._get_retry_iterator(state), self._get_initial_request_iterator(state, **kwargs)) def _prepare_runtime_estimation(self, **kwargs): # type: (Optional[Dict[str, Any]]) -> None if self._requests_per_state_estimated is None: try: initial_requests = self._get_initial_requests(**kwargs) self._requests_per_state_estimated = len(list(initial_requests)) except NotImplementedError: pass def runtime_estimation(self): # type: () -> Optional[Tuple[int, int, float]] if self._requests_per_state_estimated is None: return None pkts_tbs = max( len(self.scanned_states) * self._requests_per_state_estimated, 1) pkts_snt = len(self.results) return pkts_tbs, pkts_snt, float(pkts_snt) / pkts_tbs def pre_execute(self, socket, state, global_configuration): # type: (_SocketUnion, EcuState, AutomotiveTestCaseExecutorConfiguration) -> None # noqa: E501 try: self._tester_present_sender = global_configuration["tps"] except KeyError: self._tester_present_sender = None def execute(self, socket, state, **kwargs): # type: (_SocketUnion, EcuState, Any) -> None self.check_kwargs(kwargs) timeout = kwargs.pop('timeout', 1) count = kwargs.pop('count', None) execution_time = kwargs.pop("execution_time", 1200) stop_event = kwargs.pop("stop_event", None) # type: Optional[threading.Event] # noqa: E501 disable_tps = kwargs.pop("disable_tps_while_sending", False) inter = kwargs.pop("inter", 0) self._prepare_runtime_estimation(**kwargs) state_block_list = kwargs.get('state_block_list', list()) if state_block_list and state in state_block_list: self._state_completed[state] = True log_automotive.debug("State %s in block list!", repr(state)) return state_allow_list = kwargs.get('state_allow_list', list()) if state_allow_list and state not in state_allow_list: self._state_completed[state] = True log_automotive.debug("State %s not in allow list!", repr(state)) return it = self._get_request_iterator(state, **kwargs) # log_automotive.debug("[i] Using iterator %s in state %s", it, state) start_time = time.monotonic() log_automotive.debug( "Start execution of enumerator: %s", time.ctime()) for req in it: if stop_event: stop_event.wait(timeout=inter) else: time.sleep(inter) if disable_tps and self._tester_present_sender: self._tester_present_sender.disable() res = self.sr1_with_retry_on_error(req, socket, state, timeout) if disable_tps and self._tester_present_sender: self._tester_present_sender.enable() self._store_result(state, req, res) if self._evaluate_response(state, req, res, **kwargs): log_automotive.debug( "Stop test_case execution because of response evaluation") return if count is not None: count -= 1 if count <= 0: log_automotive.debug( "Finished execution count of enumerator") return if (start_time + execution_time) < time.monotonic(): log_automotive.debug( "[i] Finished execution time of enumerator: %s", time.ctime()) return if stop_event is not None and stop_event.is_set(): log_automotive.info( "Stop test_case execution because of stop event") return log_automotive.info("Finished iterator execution") self._state_completed[state] = True log_automotive.debug("States completed %s", repr(self._state_completed)) execute.__doc__ = _supported_kwargs_doc def sr1_with_retry_on_error(self, req, socket, state, timeout): # type: (Packet, _SocketUnion, EcuState, int) -> Optional[Packet] try: res = socket.sr1(req, timeout=timeout, verbose=False, chainEX=True, chainCC=True) except (OSError, ValueError, Scapy_Exception) as e: if not self._populate_retry(state, req): log_automotive.exception( "Exception during retry. This is bad") raise e return res def _evaluate_response(self, state, # type: EcuState request, # type: Packet response, # type: Optional[Packet] **kwargs # type: Optional[Dict[str, Any]] ): # type: (...) -> bool """ Evaluates the response and determines if the current scan execution should be stopped. :param state: Current state of the ECU under test :param request: Sent request :param response: Received response :param kwargs: Arguments to modify the behavior of this function. Supported arguments: - retry_if_none_received: True/False - exit_if_no_answer_received: True/False - exit_if_service_not_supported: True/False - exit_scan_on_first_negative_response: True/False - retry_if_busy_returncode: True/False :return: True, if current execution needs to be interrupted. False, if enumerator should proceed with the execution. """ if response is None: if cast(bool, kwargs.pop("retry_if_none_received", False)): log_automotive.debug( "Retry %s because None received", repr(request)) return self._populate_retry(state, request) return cast(bool, kwargs.pop("exit_if_no_answer_received", False)) if self._evaluate_negative_response_code( state, response, **kwargs): # leave current execution, because of a negative response code return True if self._evaluate_retry(state, request, response, **kwargs): # leave current execution, because a retry was set return True # cleanup retry packet self._retry_pkt[state] = [] return self._evaluate_ecu_state_modifications(state, request, response) def _evaluate_ecu_state_modifications(self, state, # type: EcuState request, # type: Packet response, # type: Packet ): # type: (...) -> bool if EcuState.is_modifier_pkt(response): if state != EcuState.get_modified_ecu_state( response, request, state): log_automotive.debug( "Exit execute. Ecu state was modified!") return True return False def _evaluate_negative_response_code(self, state, # type: EcuState response, # type: Packet **kwargs # type: Optional[Dict[str, Any]] # noqa: E501 ): # type: (...) -> bool exit_if_service_not_supported = \ kwargs.pop("exit_if_service_not_supported", False) exit_scan_on_first_negative_response = \ kwargs.pop("exit_scan_on_first_negative_response", False) if exit_scan_on_first_negative_response and response.service == 0x7f: return True if exit_if_service_not_supported and response.service == 0x7f: response_code = self._get_negative_response_code(response) if response_code in [0x11, 0x7f]: names = {0x11: "serviceNotSupported", 0x7f: "serviceNotSupportedInActiveSession"} log_automotive.debug( "Exit execute because negative response %s received!", names[response_code]) # execute of current state is completed, # since a serviceNotSupported negative response was received self._state_completed[state] = True # stop current execute and exit return True return False def _populate_retry(self, state, # type: EcuState request, # type: Packet ): # type: (...) -> bool """ Populates internal storage with request for a retry. :param state: Current state :param request: Request which needs a retry :return: True, if storage was populated. If False is returned, the retry storage is still populated. This indicates that the current execution was already a retry execution. """ if not self._get_retry_iterator(state): # This was no retry since the retry_pkt is None self._retry_pkt[state] = request log_automotive.debug( "Exit execute. Retry packet next time!") return True else: # This was a unsuccessful retry, continue execute log_automotive.debug("Unsuccessful retry!") return False def _evaluate_retry(self, state, # type: EcuState request, # type: Packet response, # type: Packet **kwargs # type: Optional[Dict[str, Any]] ): # type: (...) -> bool retry_if_busy_returncode = \ kwargs.pop("retry_if_busy_returncode", True) if retry_if_busy_returncode and response.service == 0x7f \ and self._get_negative_response_code(response) == 0x21: log_automotive.debug( "Retry %s because retry_if_busy_returncode received", repr(request)) return self._populate_retry(state, request) return False def _compute_statistics(self): # type: () -> List[Tuple[str, str, str]] data_sets = [("all", self._results)] for state in self._state_completed.keys(): data_sets.append((repr(state), [r for r in self._results if r.state == state])) stats = list() # type: List[Tuple[str, str, str]] for desc, data in data_sets: answered = [cast(_AutomotiveTestCaseFilteredScanResult, r) for r in data if r.resp is not None and r.resp_ts is not None] unanswered = [r for r in data if r.resp is None] answertimes = [float(x.resp_ts) - float(x.req_ts) for x in answered] answertimes_nr = [float(x.resp_ts) - float(x.req_ts) for x in answered if x.resp.service == 0x7f] answertimes_pr = [float(x.resp_ts) - float(x.req_ts) for x in answered if x.resp.service != 0x7f] nrs = [r.resp for r in answered if r.resp.service == 0x7f] stats.append((desc, "num_answered", str(len(answered)))) stats.append((desc, "num_unanswered", str(len(unanswered)))) stats.append((desc, "num_negative_resps", str(len(nrs)))) for postfix, times in zip( ["", "_nr", "_pr"], [answertimes, answertimes_nr, answertimes_pr]): try: ma = str(round(max(times), 5)) except ValueError: ma = "-" try: mi = str(round(min(times), 5)) except ValueError: mi = "-" try: avg = str(round(sum(times) / len(times), 5)) except (ValueError, ZeroDivisionError): avg = "-" stats.append((desc, "answertime_min" + postfix, mi)) stats.append((desc, "answertime_max" + postfix, ma)) stats.append((desc, "answertime_avg" + postfix, avg)) return stats def _show_statistics(self, **kwargs): # type: (Any) -> str stats = self._compute_statistics() s = "%d requests were sent, %d answered, %d unanswered" % \ (len(self._results), len(self.results_with_response), len(self.results_without_response)) + "\n" s += "Statistics per state\n" s += make_lined_table(stats, lambda *x: x, dump=True, sortx=str, sorty=str) or "" return s + "\n" def _prepare_negative_response_blacklist(self): # type: () -> None nrc_dict = defaultdict(int) # type: Dict[int, int] for nr in self.results_with_negative_response: nrc_dict[self._get_negative_response_code(nr.resp)] += 1 total_nr_count = len(self.results_with_negative_response) for nrc, nr_count in nrc_dict.items(): if nrc not in self.negative_response_blacklist and \ nr_count > 30 and (nr_count / total_nr_count) > 0.3: log_automotive.info("Added NRC 0x%02x to filter", nrc) self.negative_response_blacklist.append(nrc) if nrc in self.negative_response_blacklist and nr_count < 10: log_automotive.info("Removed NRC 0x%02x to filter", nrc) self.negative_response_blacklist.remove(nrc) @property def results(self): # type: () -> List[_AutomotiveTestCaseScanResult] return self._results @property def results_with_response(self): # type: () -> List[_AutomotiveTestCaseFilteredScanResult] filtered_results = list() for r in self._results: if r.resp is None: continue if r.resp_ts is None: continue fr = cast(_AutomotiveTestCaseFilteredScanResult, r) filtered_results.append(fr) return filtered_results @property def filtered_results(self): # type: () -> List[_AutomotiveTestCaseFilteredScanResult] filtered_results = self.results_with_positive_response for r in self.results_with_negative_response: nrc = self._get_negative_response_code(r.resp) if nrc not in self.negative_response_blacklist: filtered_results.append(r) return filtered_results @property def scanned_states(self): # type: () -> Set[EcuState] """ Helper function to get all sacnned states in results :return: all scanned states """ return set([tup.state for tup in self._results]) @property def results_with_negative_response(self): # type: () -> List[_AutomotiveTestCaseFilteredScanResult] """ Helper function to get all results with negative response :return: all results with negative response """ return [r for r in self.results_with_response if r.resp and r.resp.service == 0x7f] @property def results_with_positive_response(self): # type: () -> List[_AutomotiveTestCaseFilteredScanResult] """ Helper function to get all results with positive response :return: all results with positive response """ return [r for r in self.results_with_response # noqa: E501 if r.resp and r.resp.service != 0x7f] @property def results_without_response(self): # type: () -> List[_AutomotiveTestCaseScanResult] """ Helper function to get all results without response :return: all results without response """ return [r for r in self._results if r.resp is None] def _show_negative_response_details(self, **kwargs): # type: (Any) -> str nrc_dict = defaultdict(int) # type: Dict[int, int] for nr in self.results_with_negative_response: nrc_dict[self._get_negative_response_code(nr.resp)] += 1 s = "These negative response codes were received " + \ " ".join([hex(c) for c in nrc_dict.keys()]) + "\n" for nrc, nr_count in nrc_dict.items(): s += "\tNRC 0x%02x: %s received %d times" % ( nrc, self._get_negative_response_desc(nrc), nr_count) s += "\n" return s + "\n" def _show_negative_response_information(self, **kwargs): # type: (Any) -> str filtered = kwargs.get("filtered", True) s = "%d negative responses were received\n" % \ len(self.results_with_negative_response) s += "\n" s += self._show_negative_response_details(**kwargs) or "" + "\n" if filtered and len(self.negative_response_blacklist): s += "The following negative response codes are blacklisted: %s\n" \ % [self._get_negative_response_desc(nr) for nr in self.negative_response_blacklist] return s + "\n" def _show_results_information(self, **kwargs): # type: (Any) -> str def _get_table_entry( *args: Any ): # type: (...) -> Tuple[str, str, str] tup = cast(_AutomotiveTestCaseScanResult, args) return self._get_table_entry_x(tup), \ self._get_table_entry_y(tup), \ self._get_table_entry_z(tup) filtered = kwargs.get("filtered", True) s = "=== No data to display ===\n" data = self._results if not filtered else self.filtered_results # type: Union[List[_AutomotiveTestCaseScanResult], List[_AutomotiveTestCaseFilteredScanResult]] # noqa: E501 if len(data): s = make_lined_table( data, _get_table_entry, dump=True, sortx=str) or "" return s + "\n" def show(self, dump=False, filtered=True, verbose=False): # type: (bool, bool, bool) -> Optional[str] if filtered: self._prepare_negative_response_blacklist() show_functions = [self._show_header, self._show_statistics, self._show_negative_response_information, self._show_results_information] if verbose: show_functions.append(self._show_state_information) s = "\n".join(x(filtered=filtered) for x in show_functions) if dump: return s + "\n" else: print(s) return None def _get_label(self, response, positive_case="PR: PositiveResponse"): # type: (Optional[Packet], Union[Callable[[Packet], str], str]) -> str if response is None: return "Timeout" elif orb(bytes(response)[0]) == 0x7f: return self._get_negative_response_label(response) else: if isinstance(positive_case, str): return positive_case elif callable(positive_case): return positive_case(response) else: raise Scapy_Exception("Unsupported Type for positive_case. " "Provide a string or a function.") @property def supported_responses(self): # type: () -> List[EcuResponse] supported_resps = list() all_responses = [p for p in self._result_packets.values() if orb(bytes(p)[0]) & 0x40] for resp in all_responses: states = list(set([t.state for t in self.results_with_response if t.resp == resp])) supported_resps.append(EcuResponse(state=states, responses=resp)) return supported_resps class StateGeneratingServiceEnumerator( ServiceEnumerator, StateGenerator, metaclass=abc.ABCMeta ): def __init__(self): # type: () -> None super(StateGeneratingServiceEnumerator, self).__init__() # Internal storage of request packets for a certain Edge. If an edge # is found during the evaluation of the last result of the # ServiceEnumerator, the according request of the result tuple is # stored together with the new Edge. self._edge_requests = dict() # type: Dict[_Edge, Packet] def get_new_edge(self, socket, # type: _SocketUnion config # type: AutomotiveTestCaseExecutorConfiguration ): # type: (...) -> Optional[_Edge] """ Basic identification of a new edge. The last response is evaluated. If this response packet can modify the state of an Ecu, this new state is returned, otherwise None. :param socket: Socket to the DUT (unused) :param config: Global configuration of the executor (unused) :return: tuple of old EcuState and new EcuState, or None """ try: state, req, resp, _, _ = cast(ServiceEnumerator, self).results[-1] except IndexError: return None if resp is not None and EcuState.is_modifier_pkt(resp): new_state = EcuState.get_modified_ecu_state(resp, req, state) if new_state == state: return None else: edge = (state, new_state) self._edge_requests[edge] = req return edge else: return None @staticmethod def transition_function( sock, # type: _SocketUnion config, # type: AutomotiveTestCaseExecutorConfiguration kwargs # type: Dict[str, Any] ): # type: (...) -> bool """ Very basic transition function. This function sends a given request in kwargs and evaluates the response. :param sock: Connection to the DUT :param config: Global configuration of the executor (unused) :param kwargs: Dictionary with arguments. This function only uses the argument *"req"* which must contain a Packet, causing an EcuState transition of the DUT. :return: True in case of a successful transition, else False """ req = kwargs.get("req", None) if req is None: return False try: res = sock.sr1(req, timeout=20, verbose=False, chainEX=True) return res is not None and res.service != 0x7f except (OSError, ValueError, Scapy_Exception) as e: log_automotive.exception( "Exception in transition function: %s", e) return False def get_transition_function_description(self, edge): # type: (_Edge) -> str return repr(self._edge_requests[edge]) def get_transition_function_kwargs(self, edge): # type: (_Edge) -> Dict[str, Any] req = self._edge_requests[edge] kwargs = { "desc": self.get_transition_function_description(edge), "req": req } return kwargs def get_transition_function(self, socket, edge): # type: (_SocketUnion, _Edge) -> Optional[_TransitionTuple] try: return self.transition_function, \ self.get_transition_function_kwargs(edge), None except KeyError: return None