1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Nils Weiss <nils@we155.de> 5 6# scapy.contrib.description = ServiceEnumerator definitions 7# scapy.contrib.status = library 8 9 10import abc 11import threading 12import time 13import copy 14from collections import defaultdict, OrderedDict 15from itertools import chain 16from typing import NamedTuple 17 18from scapy.compat import orb 19from scapy.contrib.automotive import log_automotive 20from scapy.error import Scapy_Exception 21from scapy.utils import make_lined_table, EDecimal, PeriodicSenderThread 22from scapy.packet import Packet 23from scapy.contrib.automotive.ecu import EcuState, EcuResponse 24from scapy.contrib.automotive.scanner.test_case import AutomotiveTestCase, \ 25 StateGenerator, _SocketUnion, _TransitionTuple 26from scapy.contrib.automotive.scanner.configuration import \ 27 AutomotiveTestCaseExecutorConfiguration 28from scapy.contrib.automotive.scanner.graph import _Edge 29 30# Typing imports 31from typing import ( 32 Any, 33 Union, 34 List, 35 Optional, 36 Iterable, 37 Dict, 38 Tuple, 39 Set, 40 Callable, 41 cast, 42) 43 44# Definition outside the class ServiceEnumerator to allow pickling 45_AutomotiveTestCaseScanResult = NamedTuple( 46 "_AutomotiveTestCaseScanResult", 47 [("state", EcuState), 48 ("req", Packet), 49 ("resp", Optional[Packet]), 50 ("req_ts", Union[EDecimal, float]), 51 ("resp_ts", Optional[Union[EDecimal, float]])]) 52 53_AutomotiveTestCaseFilteredScanResult = NamedTuple( 54 "_AutomotiveTestCaseFilteredScanResult", 55 [("state", EcuState), 56 ("req", Packet), 57 ("resp", Packet), 58 ("req_ts", Union[EDecimal, float]), 59 ("resp_ts", Union[EDecimal, float])]) 60 61 62class ServiceEnumerator(AutomotiveTestCase, metaclass=abc.ABCMeta): 63 """ 64 Base class for ServiceEnumerators of automotive diagnostic protocols 65 """ 66 67 _supported_kwargs = copy.copy(AutomotiveTestCase._supported_kwargs) 68 _supported_kwargs.update({ 69 'timeout': ((int, float), lambda x: x > 0), 70 'count': (int, lambda x: x >= 0), 71 'execution_time': (int, None), 72 'state_allow_list': ((list, EcuState), None), 73 'state_block_list': ((list, EcuState), None), 74 'retry_if_none_received': (bool, None), 75 'exit_if_no_answer_received': (bool, None), 76 'exit_if_service_not_supported': (bool, None), 77 'exit_scan_on_first_negative_response': (bool, None), 78 'retry_if_busy_returncode': (bool, None), 79 'stop_event': (threading.Event, None), 80 'debug': (bool, None), 81 'scan_range': ((list, tuple, range), None), 82 'unittest': (bool, None), 83 'disable_tps_while_sending': (bool, None), 84 'inter': ((int, float), lambda x: x >= 0), 85 }) 86 87 _supported_kwargs_doc = AutomotiveTestCase._supported_kwargs_doc + """ 88 :param timeout: Timeout until a response will arrive after a request 89 :type timeout: integer or float 90 :param integer count: Number of request to be sent in one execution 91 :param int execution_time: Time in seconds until the execution of 92 this enumerator is stopped. 93 :param state_allow_list: List of EcuState objects or EcuState object 94 in which the the execution of this enumerator 95 is allowed. If provided, other states will not 96 be executed. 97 :type state_allow_list: EcuState or list 98 :param state_block_list: List of EcuState objects or EcuState object 99 in which the the execution of this enumerator 100 is blocked. 101 :type state_block_list: EcuState or list 102 :param bool retry_if_none_received: Specifies if a request will be send 103 again, if None was received 104 (usually because of a timeout). 105 :param bool exit_if_no_answer_received: Specifies to finish the 106 execution of this enumerator 107 once None is received. 108 :param bool exit_if_service_not_supported: Specifies to finish the 109 execution of this 110 enumerator, once the 111 negative return code 112 'serviceNotSupported' is 113 received. 114 :param bool exit_scan_on_first_negative_response: Specifies to finish 115 the execution once a 116 negative response is 117 received. 118 :param bool retry_if_busy_returncode: Specifies to retry a request, if 119 the 'busyRepeatRequest' negative 120 response code is received. 121 :param bool debug: Enables debug functions during execute. 122 :param Event stop_event: Signals immediate stop of the execution. 123 :param scan_range: Specifies the identifiers to be scanned. 124 :type scan_range: list or tuple or range or iterable 125 :param disable_tps_while_sending: Temporary disables a TesterPresentSender 126 to not interact with a seed request. 127 :type disable_tps_while_sending: bool 128 :param inter: delay between two packets during sending 129 :type inter: int or float""" 130 131 def __init__(self): 132 # type: () -> None 133 super(ServiceEnumerator, self).__init__() 134 self._result_packets = OrderedDict() # type: Dict[bytes, Packet] 135 self._results = list() # type: List[_AutomotiveTestCaseScanResult] 136 self._request_iterators = dict() # type: Dict[EcuState, Iterable[Packet]] # noqa: E501 137 self._retry_pkt = defaultdict(list) # type: Dict[EcuState, Union[Packet, Iterable[Packet]]] # noqa: E501 138 self._negative_response_blacklist = [0x10, 0x11] # type: List[int] 139 self._requests_per_state_estimated = None # type: Optional[int] 140 self._tester_present_sender = None # type: Optional[PeriodicSenderThread] 141 142 @staticmethod 143 @abc.abstractmethod 144 def _get_negative_response_code(resp): 145 # type: (Packet) -> int 146 raise NotImplementedError() 147 148 @staticmethod 149 @abc.abstractmethod 150 def _get_negative_response_desc(nrc): 151 # type: (int) -> str 152 raise NotImplementedError() 153 154 def _get_table_entry_x(self, tup): 155 # type: (_AutomotiveTestCaseScanResult) -> str 156 """ 157 Provides a table entry for the column which gets print during `show()`. 158 :param tup: A results tuple 159 :return: A string which describes the state 160 """ 161 return str(tup[0]) 162 163 def _get_table_entry_y(self, tup): 164 # type: (_AutomotiveTestCaseScanResult) -> str 165 """ 166 Provides a table entry for the line which gets print during `show()`. 167 :param tup: A results tuple 168 :return: A string which describes the request 169 """ 170 return repr(tup[1]) 171 172 def _get_table_entry_z(self, tup): 173 # type: (_AutomotiveTestCaseScanResult) -> str 174 """ 175 Provides a table entry for the field which gets print during `show()`. 176 :param tup: A results tuple 177 :return: A string which describes the response 178 """ 179 return repr(tup[2]) 180 181 @staticmethod 182 @abc.abstractmethod 183 def _get_negative_response_label(response): 184 # type: (Packet) -> str 185 raise NotImplementedError() 186 187 @abc.abstractmethod 188 def _get_initial_requests(self, **kwargs): 189 # type: (Any) -> Iterable[Packet] 190 raise NotImplementedError("Overwrite this method") 191 192 def __reduce__(self): # type: ignore 193 f, t, d = super(ServiceEnumerator, self).__reduce__() # type: ignore 194 try: 195 for k, v in d["_request_iterators"].items(): 196 d["_request_iterators"][k] = list(v) 197 except KeyError: 198 pass 199 200 try: 201 for k in d["_retry_pkt"]: 202 d["_retry_pkt"][k] = list(self._get_retry_iterator(k)) 203 except KeyError: 204 pass 205 return f, t, d 206 207 @property 208 def negative_response_blacklist(self): 209 # type: () -> List[int] 210 return self._negative_response_blacklist 211 212 @property 213 def completed(self): 214 # type: () -> bool 215 if len(self._results): 216 return all([self.has_completed(s) for s in self.scanned_states]) 217 else: 218 return super(ServiceEnumerator, self).completed 219 220 def _store_result(self, state, req, res): 221 # type: (EcuState, Packet, Optional[Packet]) -> None 222 if bytes(req) not in self._result_packets: 223 self._result_packets[bytes(req)] = req 224 225 if res and bytes(res) not in self._result_packets: 226 self._result_packets[bytes(res)] = res 227 228 self._results.append(_AutomotiveTestCaseScanResult( 229 state, 230 self._result_packets[bytes(req)], 231 self._result_packets[bytes(res)] if res is not None else None, 232 req.sent_time or 0.0, 233 res.time if res is not None else None)) 234 235 def _get_retry_iterator(self, state): 236 # type: (EcuState) -> Iterable[Packet] 237 retry_entry = self._retry_pkt[state] 238 if isinstance(retry_entry, Packet): 239 log_automotive.debug("Provide retry packet") 240 return [retry_entry] 241 elif isinstance(retry_entry, list): 242 if len(retry_entry): 243 log_automotive.debug("Provide retry list") 244 else: 245 log_automotive.debug("Provide retry iterator") 246 # assume self.retry_pkt is a generator or list 247 248 return retry_entry 249 250 def _get_initial_request_iterator(self, state, **kwargs): 251 # type: (EcuState, Any) -> Iterable[Packet] 252 if state not in self._request_iterators: 253 self._request_iterators[state] = iter( 254 self._get_initial_requests(**kwargs)) 255 256 return self._request_iterators[state] 257 258 def _get_request_iterator(self, state, **kwargs): 259 # type: (EcuState, Optional[Dict[str, Any]]) -> Iterable[Packet] 260 return chain(self._get_retry_iterator(state), 261 self._get_initial_request_iterator(state, **kwargs)) 262 263 def _prepare_runtime_estimation(self, **kwargs): 264 # type: (Optional[Dict[str, Any]]) -> None 265 if self._requests_per_state_estimated is None: 266 try: 267 initial_requests = self._get_initial_requests(**kwargs) 268 self._requests_per_state_estimated = len(list(initial_requests)) 269 except NotImplementedError: 270 pass 271 272 def runtime_estimation(self): 273 # type: () -> Optional[Tuple[int, int, float]] 274 if self._requests_per_state_estimated is None: 275 return None 276 277 pkts_tbs = max( 278 len(self.scanned_states) * self._requests_per_state_estimated, 1) 279 pkts_snt = len(self.results) 280 281 return pkts_tbs, pkts_snt, float(pkts_snt) / pkts_tbs 282 283 def pre_execute(self, socket, state, global_configuration): 284 # type: (_SocketUnion, EcuState, AutomotiveTestCaseExecutorConfiguration) -> None # noqa: E501 285 try: 286 self._tester_present_sender = global_configuration["tps"] 287 except KeyError: 288 self._tester_present_sender = None 289 290 def execute(self, socket, state, **kwargs): 291 # type: (_SocketUnion, EcuState, Any) -> None 292 self.check_kwargs(kwargs) 293 timeout = kwargs.pop('timeout', 1) 294 count = kwargs.pop('count', None) 295 execution_time = kwargs.pop("execution_time", 1200) 296 stop_event = kwargs.pop("stop_event", None) # type: Optional[threading.Event] # noqa: E501 297 disable_tps = kwargs.pop("disable_tps_while_sending", False) 298 inter = kwargs.pop("inter", 0) 299 300 self._prepare_runtime_estimation(**kwargs) 301 302 state_block_list = kwargs.get('state_block_list', list()) 303 304 if state_block_list and state in state_block_list: 305 self._state_completed[state] = True 306 log_automotive.debug("State %s in block list!", repr(state)) 307 return 308 309 state_allow_list = kwargs.get('state_allow_list', list()) 310 311 if state_allow_list and state not in state_allow_list: 312 self._state_completed[state] = True 313 log_automotive.debug("State %s not in allow list!", 314 repr(state)) 315 return 316 317 it = self._get_request_iterator(state, **kwargs) 318 319 # log_automotive.debug("[i] Using iterator %s in state %s", it, state) 320 321 start_time = time.monotonic() 322 log_automotive.debug( 323 "Start execution of enumerator: %s", time.ctime()) 324 325 for req in it: 326 if stop_event: 327 stop_event.wait(timeout=inter) 328 else: 329 time.sleep(inter) 330 331 if disable_tps and self._tester_present_sender: 332 self._tester_present_sender.disable() 333 334 res = self.sr1_with_retry_on_error(req, socket, state, timeout) 335 336 if disable_tps and self._tester_present_sender: 337 self._tester_present_sender.enable() 338 339 self._store_result(state, req, res) 340 341 if self._evaluate_response(state, req, res, **kwargs): 342 log_automotive.debug( 343 "Stop test_case execution because of response evaluation") 344 return 345 346 if count is not None: 347 count -= 1 348 if count <= 0: 349 log_automotive.debug( 350 "Finished execution count of enumerator") 351 return 352 353 if (start_time + execution_time) < time.monotonic(): 354 log_automotive.debug( 355 "[i] Finished execution time of enumerator: %s", 356 time.ctime()) 357 return 358 359 if stop_event is not None and stop_event.is_set(): 360 log_automotive.info( 361 "Stop test_case execution because of stop event") 362 return 363 364 log_automotive.info("Finished iterator execution") 365 self._state_completed[state] = True 366 log_automotive.debug("States completed %s", 367 repr(self._state_completed)) 368 369 execute.__doc__ = _supported_kwargs_doc 370 371 def sr1_with_retry_on_error(self, req, socket, state, timeout): 372 # type: (Packet, _SocketUnion, EcuState, int) -> Optional[Packet] 373 try: 374 res = socket.sr1(req, timeout=timeout, verbose=False, 375 chainEX=True, chainCC=True) 376 except (OSError, ValueError, Scapy_Exception) as e: 377 if not self._populate_retry(state, req): 378 log_automotive.exception( 379 "Exception during retry. This is bad") 380 raise e 381 return res 382 383 def _evaluate_response(self, 384 state, # type: EcuState 385 request, # type: Packet 386 response, # type: Optional[Packet] 387 **kwargs # type: Optional[Dict[str, Any]] 388 ): # type: (...) -> bool 389 """ 390 Evaluates the response and determines if the current scan execution 391 should be stopped. 392 :param state: Current state of the ECU under test 393 :param request: Sent request 394 :param response: Received response 395 :param kwargs: Arguments to modify the behavior of this function. 396 Supported arguments: 397 - retry_if_none_received: True/False 398 - exit_if_no_answer_received: True/False 399 - exit_if_service_not_supported: True/False 400 - exit_scan_on_first_negative_response: True/False 401 - retry_if_busy_returncode: True/False 402 :return: True, if current execution needs to be interrupted. 403 False, if enumerator should proceed with the execution. 404 """ 405 if response is None: 406 if cast(bool, kwargs.pop("retry_if_none_received", False)): 407 log_automotive.debug( 408 "Retry %s because None received", repr(request)) 409 return self._populate_retry(state, request) 410 return cast(bool, kwargs.pop("exit_if_no_answer_received", False)) 411 412 if self._evaluate_negative_response_code( 413 state, response, **kwargs): 414 # leave current execution, because of a negative response code 415 return True 416 417 if self._evaluate_retry(state, request, response, **kwargs): 418 # leave current execution, because a retry was set 419 return True 420 421 # cleanup retry packet 422 self._retry_pkt[state] = [] 423 424 return self._evaluate_ecu_state_modifications(state, request, response) 425 426 def _evaluate_ecu_state_modifications(self, 427 state, # type: EcuState 428 request, # type: Packet 429 response, # type: Packet 430 ): # type: (...) -> bool 431 if EcuState.is_modifier_pkt(response): 432 if state != EcuState.get_modified_ecu_state( 433 response, request, state): 434 log_automotive.debug( 435 "Exit execute. Ecu state was modified!") 436 return True 437 return False 438 439 def _evaluate_negative_response_code(self, 440 state, # type: EcuState 441 response, # type: Packet 442 **kwargs # type: Optional[Dict[str, Any]] # noqa: E501 443 ): # type: (...) -> bool 444 exit_if_service_not_supported = \ 445 kwargs.pop("exit_if_service_not_supported", False) 446 exit_scan_on_first_negative_response = \ 447 kwargs.pop("exit_scan_on_first_negative_response", False) 448 449 if exit_scan_on_first_negative_response and response.service == 0x7f: 450 return True 451 452 if exit_if_service_not_supported and response.service == 0x7f: 453 response_code = self._get_negative_response_code(response) 454 if response_code in [0x11, 0x7f]: 455 names = {0x11: "serviceNotSupported", 456 0x7f: "serviceNotSupportedInActiveSession"} 457 log_automotive.debug( 458 "Exit execute because negative response %s received!", 459 names[response_code]) 460 # execute of current state is completed, 461 # since a serviceNotSupported negative response was received 462 self._state_completed[state] = True 463 # stop current execute and exit 464 return True 465 return False 466 467 def _populate_retry(self, 468 state, # type: EcuState 469 request, # type: Packet 470 ): # type: (...) -> bool 471 """ 472 Populates internal storage with request for a retry. 473 474 :param state: Current state 475 :param request: Request which needs a retry 476 :return: True, if storage was populated. If False is returned, the 477 retry storage is still populated. This indicates that the 478 current execution was already a retry execution. 479 """ 480 481 if not self._get_retry_iterator(state): 482 # This was no retry since the retry_pkt is None 483 self._retry_pkt[state] = request 484 log_automotive.debug( 485 "Exit execute. Retry packet next time!") 486 return True 487 else: 488 # This was a unsuccessful retry, continue execute 489 log_automotive.debug("Unsuccessful retry!") 490 return False 491 492 def _evaluate_retry(self, 493 state, # type: EcuState 494 request, # type: Packet 495 response, # type: Packet 496 **kwargs # type: Optional[Dict[str, Any]] 497 ): # type: (...) -> bool 498 retry_if_busy_returncode = \ 499 kwargs.pop("retry_if_busy_returncode", True) 500 501 if retry_if_busy_returncode and response.service == 0x7f \ 502 and self._get_negative_response_code(response) == 0x21: 503 log_automotive.debug( 504 "Retry %s because retry_if_busy_returncode received", 505 repr(request)) 506 return self._populate_retry(state, request) 507 return False 508 509 def _compute_statistics(self): 510 # type: () -> List[Tuple[str, str, str]] 511 data_sets = [("all", self._results)] 512 513 for state in self._state_completed.keys(): 514 data_sets.append((repr(state), 515 [r for r in self._results if r.state == state])) 516 517 stats = list() # type: List[Tuple[str, str, str]] 518 519 for desc, data in data_sets: 520 answered = [cast(_AutomotiveTestCaseFilteredScanResult, r) 521 for r in data if r.resp is not None and 522 r.resp_ts is not None] 523 unanswered = [r for r in data if r.resp is None] 524 answertimes = [float(x.resp_ts) - float(x.req_ts) 525 for x in answered] 526 answertimes_nr = [float(x.resp_ts) - float(x.req_ts) 527 for x in answered if x.resp.service == 0x7f] 528 answertimes_pr = [float(x.resp_ts) - float(x.req_ts) 529 for x in answered if x.resp.service != 0x7f] 530 531 nrs = [r.resp for r in answered if r.resp.service == 0x7f] 532 stats.append((desc, "num_answered", str(len(answered)))) 533 stats.append((desc, "num_unanswered", str(len(unanswered)))) 534 stats.append((desc, "num_negative_resps", str(len(nrs)))) 535 536 for postfix, times in zip( 537 ["", "_nr", "_pr"], 538 [answertimes, answertimes_nr, answertimes_pr]): 539 try: 540 ma = str(round(max(times), 5)) 541 except ValueError: 542 ma = "-" 543 544 try: 545 mi = str(round(min(times), 5)) 546 except ValueError: 547 mi = "-" 548 549 try: 550 avg = str(round(sum(times) / len(times), 5)) 551 except (ValueError, ZeroDivisionError): 552 avg = "-" 553 554 stats.append((desc, "answertime_min" + postfix, mi)) 555 stats.append((desc, "answertime_max" + postfix, ma)) 556 stats.append((desc, "answertime_avg" + postfix, avg)) 557 558 return stats 559 560 def _show_statistics(self, **kwargs): 561 # type: (Any) -> str 562 stats = self._compute_statistics() 563 564 s = "%d requests were sent, %d answered, %d unanswered" % \ 565 (len(self._results), 566 len(self.results_with_response), 567 len(self.results_without_response)) + "\n" 568 569 s += "Statistics per state\n" 570 s += make_lined_table(stats, lambda *x: x, dump=True, sortx=str, 571 sorty=str) or "" 572 573 return s + "\n" 574 575 def _prepare_negative_response_blacklist(self): 576 # type: () -> None 577 nrc_dict = defaultdict(int) # type: Dict[int, int] 578 for nr in self.results_with_negative_response: 579 nrc_dict[self._get_negative_response_code(nr.resp)] += 1 580 581 total_nr_count = len(self.results_with_negative_response) 582 for nrc, nr_count in nrc_dict.items(): 583 if nrc not in self.negative_response_blacklist and \ 584 nr_count > 30 and (nr_count / total_nr_count) > 0.3: 585 log_automotive.info("Added NRC 0x%02x to filter", nrc) 586 self.negative_response_blacklist.append(nrc) 587 588 if nrc in self.negative_response_blacklist and nr_count < 10: 589 log_automotive.info("Removed NRC 0x%02x to filter", nrc) 590 self.negative_response_blacklist.remove(nrc) 591 592 @property 593 def results(self): 594 # type: () -> List[_AutomotiveTestCaseScanResult] 595 return self._results 596 597 @property 598 def results_with_response(self): 599 # type: () -> List[_AutomotiveTestCaseFilteredScanResult] 600 filtered_results = list() 601 for r in self._results: 602 if r.resp is None: 603 continue 604 if r.resp_ts is None: 605 continue 606 fr = cast(_AutomotiveTestCaseFilteredScanResult, r) 607 filtered_results.append(fr) 608 return filtered_results 609 610 @property 611 def filtered_results(self): 612 # type: () -> List[_AutomotiveTestCaseFilteredScanResult] 613 filtered_results = self.results_with_positive_response 614 615 for r in self.results_with_negative_response: 616 nrc = self._get_negative_response_code(r.resp) 617 if nrc not in self.negative_response_blacklist: 618 filtered_results.append(r) 619 return filtered_results 620 621 @property 622 def scanned_states(self): 623 # type: () -> Set[EcuState] 624 """ 625 Helper function to get all sacnned states in results 626 :return: all scanned states 627 """ 628 return set([tup.state for tup in self._results]) 629 630 @property 631 def results_with_negative_response(self): 632 # type: () -> List[_AutomotiveTestCaseFilteredScanResult] 633 """ 634 Helper function to get all results with negative response 635 :return: all results with negative response 636 """ 637 return [r for r in self.results_with_response 638 if r.resp and r.resp.service == 0x7f] 639 640 @property 641 def results_with_positive_response(self): 642 # type: () -> List[_AutomotiveTestCaseFilteredScanResult] 643 """ 644 Helper function to get all results with positive response 645 :return: all results with positive response 646 """ 647 return [r for r in self.results_with_response # noqa: E501 648 if r.resp and r.resp.service != 0x7f] 649 650 @property 651 def results_without_response(self): 652 # type: () -> List[_AutomotiveTestCaseScanResult] 653 """ 654 Helper function to get all results without response 655 :return: all results without response 656 """ 657 return [r for r in self._results if r.resp is None] 658 659 def _show_negative_response_details(self, **kwargs): 660 # type: (Any) -> str 661 nrc_dict = defaultdict(int) # type: Dict[int, int] 662 for nr in self.results_with_negative_response: 663 nrc_dict[self._get_negative_response_code(nr.resp)] += 1 664 665 s = "These negative response codes were received " + \ 666 " ".join([hex(c) for c in nrc_dict.keys()]) + "\n" 667 for nrc, nr_count in nrc_dict.items(): 668 s += "\tNRC 0x%02x: %s received %d times" % ( 669 nrc, self._get_negative_response_desc(nrc), nr_count) 670 s += "\n" 671 672 return s + "\n" 673 674 def _show_negative_response_information(self, **kwargs): 675 # type: (Any) -> str 676 filtered = kwargs.get("filtered", True) 677 s = "%d negative responses were received\n" % \ 678 len(self.results_with_negative_response) 679 680 s += "\n" 681 682 s += self._show_negative_response_details(**kwargs) or "" + "\n" 683 if filtered and len(self.negative_response_blacklist): 684 s += "The following negative response codes are blacklisted: %s\n" \ 685 % [self._get_negative_response_desc(nr) 686 for nr in self.negative_response_blacklist] 687 688 return s + "\n" 689 690 def _show_results_information(self, **kwargs): 691 # type: (Any) -> str 692 def _get_table_entry( 693 *args: Any 694 ): # type: (...) -> Tuple[str, str, str] 695 tup = cast(_AutomotiveTestCaseScanResult, args) 696 return self._get_table_entry_x(tup), \ 697 self._get_table_entry_y(tup), \ 698 self._get_table_entry_z(tup) 699 700 filtered = kwargs.get("filtered", True) 701 s = "=== No data to display ===\n" 702 data = self._results if not filtered else self.filtered_results # type: Union[List[_AutomotiveTestCaseScanResult], List[_AutomotiveTestCaseFilteredScanResult]] # noqa: E501 703 if len(data): 704 s = make_lined_table( 705 data, _get_table_entry, dump=True, sortx=str) or "" 706 707 return s + "\n" 708 709 def show(self, dump=False, filtered=True, verbose=False): 710 # type: (bool, bool, bool) -> Optional[str] 711 if filtered: 712 self._prepare_negative_response_blacklist() 713 714 show_functions = [self._show_header, 715 self._show_statistics, 716 self._show_negative_response_information, 717 self._show_results_information] 718 719 if verbose: 720 show_functions.append(self._show_state_information) 721 722 s = "\n".join(x(filtered=filtered) for x in show_functions) 723 724 if dump: 725 return s + "\n" 726 else: 727 print(s) 728 return None 729 730 def _get_label(self, response, positive_case="PR: PositiveResponse"): 731 # type: (Optional[Packet], Union[Callable[[Packet], str], str]) -> str 732 if response is None: 733 return "Timeout" 734 elif orb(bytes(response)[0]) == 0x7f: 735 return self._get_negative_response_label(response) 736 else: 737 if isinstance(positive_case, str): 738 return positive_case 739 elif callable(positive_case): 740 return positive_case(response) 741 else: 742 raise Scapy_Exception("Unsupported Type for positive_case. " 743 "Provide a string or a function.") 744 745 @property 746 def supported_responses(self): 747 # type: () -> List[EcuResponse] 748 supported_resps = list() 749 all_responses = [p for p in self._result_packets.values() 750 if orb(bytes(p)[0]) & 0x40] 751 for resp in all_responses: 752 states = list(set([t.state for t in self.results_with_response 753 if t.resp == resp])) 754 supported_resps.append(EcuResponse(state=states, responses=resp)) 755 return supported_resps 756 757 758class StateGeneratingServiceEnumerator( 759 ServiceEnumerator, 760 StateGenerator, 761 metaclass=abc.ABCMeta 762): 763 def __init__(self): 764 # type: () -> None 765 super(StateGeneratingServiceEnumerator, self).__init__() 766 767 # Internal storage of request packets for a certain Edge. If an edge 768 # is found during the evaluation of the last result of the 769 # ServiceEnumerator, the according request of the result tuple is 770 # stored together with the new Edge. 771 self._edge_requests = dict() # type: Dict[_Edge, Packet] 772 773 def get_new_edge(self, 774 socket, # type: _SocketUnion 775 config # type: AutomotiveTestCaseExecutorConfiguration 776 ): 777 # type: (...) -> Optional[_Edge] 778 """ 779 Basic identification of a new edge. The last response is evaluated. 780 If this response packet can modify the state of an Ecu, this new 781 state is returned, otherwise None. 782 783 :param socket: Socket to the DUT (unused) 784 :param config: Global configuration of the executor (unused) 785 :return: tuple of old EcuState and new EcuState, or None 786 """ 787 try: 788 state, req, resp, _, _ = cast(ServiceEnumerator, self).results[-1] 789 except IndexError: 790 return None 791 792 if resp is not None and EcuState.is_modifier_pkt(resp): 793 new_state = EcuState.get_modified_ecu_state(resp, req, state) 794 if new_state == state: 795 return None 796 else: 797 edge = (state, new_state) 798 self._edge_requests[edge] = req 799 return edge 800 else: 801 return None 802 803 @staticmethod 804 def transition_function( 805 sock, # type: _SocketUnion 806 config, # type: AutomotiveTestCaseExecutorConfiguration 807 kwargs # type: Dict[str, Any] 808 ): 809 # type: (...) -> bool 810 """ 811 Very basic transition function. This function sends a given request 812 in kwargs and evaluates the response. 813 814 :param sock: Connection to the DUT 815 :param config: Global configuration of the executor (unused) 816 :param kwargs: Dictionary with arguments. This function only uses 817 the argument *"req"* which must contain a Packet, 818 causing an EcuState transition of the DUT. 819 :return: True in case of a successful transition, else False 820 """ 821 req = kwargs.get("req", None) 822 if req is None: 823 return False 824 825 try: 826 res = sock.sr1(req, timeout=20, verbose=False, chainEX=True) 827 return res is not None and res.service != 0x7f 828 except (OSError, ValueError, Scapy_Exception) as e: 829 log_automotive.exception( 830 "Exception in transition function: %s", e) 831 return False 832 833 def get_transition_function_description(self, edge): 834 # type: (_Edge) -> str 835 return repr(self._edge_requests[edge]) 836 837 def get_transition_function_kwargs(self, edge): 838 # type: (_Edge) -> Dict[str, Any] 839 req = self._edge_requests[edge] 840 kwargs = { 841 "desc": self.get_transition_function_description(edge), 842 "req": req 843 } 844 return kwargs 845 846 def get_transition_function(self, socket, edge): 847 # type: (_SocketUnion, _Edge) -> Optional[_TransitionTuple] 848 try: 849 return self.transition_function, \ 850 self.get_transition_function_kwargs(edge), None 851 except KeyError: 852 return None 853