• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Nils Weiss <nils@we155.de>
5
6# scapy.contrib.description = ServiceEnumerator definitions
7# scapy.contrib.status = library
8
9
10import abc
11import threading
12import time
13import copy
14from collections import defaultdict, OrderedDict
15from itertools import chain
16from typing import NamedTuple
17
18from scapy.compat import orb
19from scapy.contrib.automotive import log_automotive
20from scapy.error import Scapy_Exception
21from scapy.utils import make_lined_table, EDecimal, PeriodicSenderThread
22from scapy.packet import Packet
23from scapy.contrib.automotive.ecu import EcuState, EcuResponse
24from scapy.contrib.automotive.scanner.test_case import AutomotiveTestCase, \
25    StateGenerator, _SocketUnion, _TransitionTuple
26from scapy.contrib.automotive.scanner.configuration import \
27    AutomotiveTestCaseExecutorConfiguration
28from scapy.contrib.automotive.scanner.graph import _Edge
29
30# Typing imports
31from typing import (
32    Any,
33    Union,
34    List,
35    Optional,
36    Iterable,
37    Dict,
38    Tuple,
39    Set,
40    Callable,
41    cast,
42)
43
44# Definition outside the class ServiceEnumerator to allow pickling
45_AutomotiveTestCaseScanResult = NamedTuple(
46    "_AutomotiveTestCaseScanResult",
47    [("state", EcuState),
48     ("req", Packet),
49     ("resp", Optional[Packet]),
50     ("req_ts", Union[EDecimal, float]),
51     ("resp_ts", Optional[Union[EDecimal, float]])])
52
53_AutomotiveTestCaseFilteredScanResult = NamedTuple(
54    "_AutomotiveTestCaseFilteredScanResult",
55    [("state", EcuState),
56     ("req", Packet),
57     ("resp", Packet),
58     ("req_ts", Union[EDecimal, float]),
59     ("resp_ts", Union[EDecimal, float])])
60
61
62class ServiceEnumerator(AutomotiveTestCase, metaclass=abc.ABCMeta):
63    """
64    Base class for ServiceEnumerators of automotive diagnostic protocols
65    """
66
67    _supported_kwargs = copy.copy(AutomotiveTestCase._supported_kwargs)
68    _supported_kwargs.update({
69        'timeout': ((int, float), lambda x: x > 0),
70        'count': (int, lambda x: x >= 0),
71        'execution_time': (int, None),
72        'state_allow_list': ((list, EcuState), None),
73        'state_block_list': ((list, EcuState), None),
74        'retry_if_none_received': (bool, None),
75        'exit_if_no_answer_received': (bool, None),
76        'exit_if_service_not_supported': (bool, None),
77        'exit_scan_on_first_negative_response': (bool, None),
78        'retry_if_busy_returncode': (bool, None),
79        'stop_event': (threading.Event, None),
80        'debug': (bool, None),
81        'scan_range': ((list, tuple, range), None),
82        'unittest': (bool, None),
83        'disable_tps_while_sending': (bool, None),
84        'inter': ((int, float), lambda x: x >= 0),
85    })
86
87    _supported_kwargs_doc = AutomotiveTestCase._supported_kwargs_doc + """
88        :param timeout: Timeout until a response will arrive after a request
89        :type timeout: integer or float
90        :param integer count: Number of request to be sent in one execution
91        :param int execution_time: Time in seconds until the execution of
92                                   this enumerator is stopped.
93        :param state_allow_list: List of EcuState objects or EcuState object
94                                 in which the the execution of this enumerator
95                                 is allowed. If provided, other states will not
96                                 be executed.
97        :type state_allow_list: EcuState or list
98        :param state_block_list: List of EcuState objects or EcuState object
99                                 in which the the execution of this enumerator
100                                 is blocked.
101        :type state_block_list: EcuState or list
102        :param bool retry_if_none_received: Specifies if a request will be send
103                                            again, if None was received
104                                            (usually because of a timeout).
105        :param bool exit_if_no_answer_received: Specifies to finish the
106                                                execution of this enumerator
107                                                once None is  received.
108        :param bool exit_if_service_not_supported: Specifies to finish the
109                                                   execution of this
110                                                   enumerator, once the
111                                                   negative return code
112                                                   'serviceNotSupported' is
113                                                   received.
114        :param bool exit_scan_on_first_negative_response: Specifies to finish
115                                                          the execution once a
116                                                          negative response is
117                                                          received.
118        :param bool retry_if_busy_returncode: Specifies to retry a request, if
119                                              the 'busyRepeatRequest' negative
120                                              response code is received.
121        :param bool debug: Enables debug functions during execute.
122        :param Event stop_event: Signals immediate stop of the execution.
123        :param scan_range: Specifies the identifiers to be scanned.
124        :type scan_range: list or tuple or range or iterable
125        :param disable_tps_while_sending: Temporary disables a TesterPresentSender
126                                          to not interact with a seed request.
127        :type disable_tps_while_sending: bool
128        :param inter: delay between two packets during sending
129        :type inter: int or float"""
130
131    def __init__(self):
132        # type: () -> None
133        super(ServiceEnumerator, self).__init__()
134        self._result_packets = OrderedDict()  # type: Dict[bytes, Packet]
135        self._results = list()  # type: List[_AutomotiveTestCaseScanResult]
136        self._request_iterators = dict()  # type: Dict[EcuState, Iterable[Packet]]  # noqa: E501
137        self._retry_pkt = defaultdict(list)  # type: Dict[EcuState, Union[Packet, Iterable[Packet]]]  # noqa: E501
138        self._negative_response_blacklist = [0x10, 0x11]  # type: List[int]
139        self._requests_per_state_estimated = None  # type: Optional[int]
140        self._tester_present_sender = None  # type: Optional[PeriodicSenderThread]
141
142    @staticmethod
143    @abc.abstractmethod
144    def _get_negative_response_code(resp):
145        # type: (Packet) -> int
146        raise NotImplementedError()
147
148    @staticmethod
149    @abc.abstractmethod
150    def _get_negative_response_desc(nrc):
151        # type: (int) -> str
152        raise NotImplementedError()
153
154    def _get_table_entry_x(self, tup):
155        # type: (_AutomotiveTestCaseScanResult) -> str
156        """
157        Provides a table entry for the column which gets print during `show()`.
158        :param tup: A results tuple
159        :return: A string which describes the state
160        """
161        return str(tup[0])
162
163    def _get_table_entry_y(self, tup):
164        # type: (_AutomotiveTestCaseScanResult) -> str
165        """
166        Provides a table entry for the line which gets print during `show()`.
167        :param tup: A results tuple
168        :return: A string which describes the request
169        """
170        return repr(tup[1])
171
172    def _get_table_entry_z(self, tup):
173        # type: (_AutomotiveTestCaseScanResult) -> str
174        """
175        Provides a table entry for the field which gets print during `show()`.
176        :param tup: A results tuple
177        :return: A string which describes the response
178        """
179        return repr(tup[2])
180
181    @staticmethod
182    @abc.abstractmethod
183    def _get_negative_response_label(response):
184        # type: (Packet) -> str
185        raise NotImplementedError()
186
187    @abc.abstractmethod
188    def _get_initial_requests(self, **kwargs):
189        # type: (Any) -> Iterable[Packet]
190        raise NotImplementedError("Overwrite this method")
191
192    def __reduce__(self):  # type: ignore
193        f, t, d = super(ServiceEnumerator, self).__reduce__()  # type: ignore
194        try:
195            for k, v in d["_request_iterators"].items():
196                d["_request_iterators"][k] = list(v)
197        except KeyError:
198            pass
199
200        try:
201            for k in d["_retry_pkt"]:
202                d["_retry_pkt"][k] = list(self._get_retry_iterator(k))
203        except KeyError:
204            pass
205        return f, t, d
206
207    @property
208    def negative_response_blacklist(self):
209        # type: () -> List[int]
210        return self._negative_response_blacklist
211
212    @property
213    def completed(self):
214        # type: () -> bool
215        if len(self._results):
216            return all([self.has_completed(s) for s in self.scanned_states])
217        else:
218            return super(ServiceEnumerator, self).completed
219
220    def _store_result(self, state, req, res):
221        # type: (EcuState, Packet, Optional[Packet]) -> None
222        if bytes(req) not in self._result_packets:
223            self._result_packets[bytes(req)] = req
224
225        if res and bytes(res) not in self._result_packets:
226            self._result_packets[bytes(res)] = res
227
228        self._results.append(_AutomotiveTestCaseScanResult(
229            state,
230            self._result_packets[bytes(req)],
231            self._result_packets[bytes(res)] if res is not None else None,
232            req.sent_time or 0.0,
233            res.time if res is not None else None))
234
235    def _get_retry_iterator(self, state):
236        # type: (EcuState) -> Iterable[Packet]
237        retry_entry = self._retry_pkt[state]
238        if isinstance(retry_entry, Packet):
239            log_automotive.debug("Provide retry packet")
240            return [retry_entry]
241        elif isinstance(retry_entry, list):
242            if len(retry_entry):
243                log_automotive.debug("Provide retry list")
244        else:
245            log_automotive.debug("Provide retry iterator")
246            # assume self.retry_pkt is a generator or list
247
248        return retry_entry
249
250    def _get_initial_request_iterator(self, state, **kwargs):
251        # type: (EcuState, Any) -> Iterable[Packet]
252        if state not in self._request_iterators:
253            self._request_iterators[state] = iter(
254                self._get_initial_requests(**kwargs))
255
256        return self._request_iterators[state]
257
258    def _get_request_iterator(self, state, **kwargs):
259        # type: (EcuState, Optional[Dict[str, Any]]) -> Iterable[Packet]
260        return chain(self._get_retry_iterator(state),
261                     self._get_initial_request_iterator(state, **kwargs))
262
263    def _prepare_runtime_estimation(self, **kwargs):
264        # type: (Optional[Dict[str, Any]]) -> None
265        if self._requests_per_state_estimated is None:
266            try:
267                initial_requests = self._get_initial_requests(**kwargs)
268                self._requests_per_state_estimated = len(list(initial_requests))
269            except NotImplementedError:
270                pass
271
272    def runtime_estimation(self):
273        # type: () -> Optional[Tuple[int, int, float]]
274        if self._requests_per_state_estimated is None:
275            return None
276
277        pkts_tbs = max(
278            len(self.scanned_states) * self._requests_per_state_estimated, 1)
279        pkts_snt = len(self.results)
280
281        return pkts_tbs, pkts_snt, float(pkts_snt) / pkts_tbs
282
283    def pre_execute(self, socket, state, global_configuration):
284        # type: (_SocketUnion, EcuState, AutomotiveTestCaseExecutorConfiguration) -> None  # noqa: E501
285        try:
286            self._tester_present_sender = global_configuration["tps"]
287        except KeyError:
288            self._tester_present_sender = None
289
290    def execute(self, socket, state, **kwargs):
291        # type: (_SocketUnion, EcuState, Any) -> None
292        self.check_kwargs(kwargs)
293        timeout = kwargs.pop('timeout', 1)
294        count = kwargs.pop('count', None)
295        execution_time = kwargs.pop("execution_time", 1200)
296        stop_event = kwargs.pop("stop_event", None)  # type: Optional[threading.Event]  # noqa: E501
297        disable_tps = kwargs.pop("disable_tps_while_sending", False)
298        inter = kwargs.pop("inter", 0)
299
300        self._prepare_runtime_estimation(**kwargs)
301
302        state_block_list = kwargs.get('state_block_list', list())
303
304        if state_block_list and state in state_block_list:
305            self._state_completed[state] = True
306            log_automotive.debug("State %s in block list!", repr(state))
307            return
308
309        state_allow_list = kwargs.get('state_allow_list', list())
310
311        if state_allow_list and state not in state_allow_list:
312            self._state_completed[state] = True
313            log_automotive.debug("State %s not in allow list!",
314                                 repr(state))
315            return
316
317        it = self._get_request_iterator(state, **kwargs)
318
319        # log_automotive.debug("[i] Using iterator %s in state %s", it, state)
320
321        start_time = time.monotonic()
322        log_automotive.debug(
323            "Start execution of enumerator: %s", time.ctime())
324
325        for req in it:
326            if stop_event:
327                stop_event.wait(timeout=inter)
328            else:
329                time.sleep(inter)
330
331            if disable_tps and self._tester_present_sender:
332                self._tester_present_sender.disable()
333
334            res = self.sr1_with_retry_on_error(req, socket, state, timeout)
335
336            if disable_tps and self._tester_present_sender:
337                self._tester_present_sender.enable()
338
339            self._store_result(state, req, res)
340
341            if self._evaluate_response(state, req, res, **kwargs):
342                log_automotive.debug(
343                    "Stop test_case execution because of response evaluation")
344                return
345
346            if count is not None:
347                count -= 1
348                if count <= 0:
349                    log_automotive.debug(
350                        "Finished execution count of enumerator")
351                    return
352
353            if (start_time + execution_time) < time.monotonic():
354                log_automotive.debug(
355                    "[i] Finished execution time of enumerator: %s",
356                    time.ctime())
357                return
358
359            if stop_event is not None and stop_event.is_set():
360                log_automotive.info(
361                    "Stop test_case execution because of stop event")
362                return
363
364        log_automotive.info("Finished iterator execution")
365        self._state_completed[state] = True
366        log_automotive.debug("States completed %s",
367                             repr(self._state_completed))
368
369    execute.__doc__ = _supported_kwargs_doc
370
371    def sr1_with_retry_on_error(self, req, socket, state, timeout):
372        # type: (Packet, _SocketUnion, EcuState, int) -> Optional[Packet]
373        try:
374            res = socket.sr1(req, timeout=timeout, verbose=False,
375                             chainEX=True, chainCC=True)
376        except (OSError, ValueError, Scapy_Exception) as e:
377            if not self._populate_retry(state, req):
378                log_automotive.exception(
379                    "Exception during retry. This is bad")
380            raise e
381        return res
382
383    def _evaluate_response(self,
384                           state,  # type: EcuState
385                           request,  # type: Packet
386                           response,  # type: Optional[Packet]
387                           **kwargs  # type: Optional[Dict[str, Any]]
388                           ):  # type: (...) -> bool
389        """
390        Evaluates the response and determines if the current scan execution
391        should be stopped.
392        :param state: Current state of the ECU under test
393        :param request: Sent request
394        :param response: Received response
395        :param kwargs: Arguments to modify the behavior of this function.
396                       Supported arguments:
397                         - retry_if_none_received: True/False
398                         - exit_if_no_answer_received: True/False
399                         - exit_if_service_not_supported: True/False
400                         - exit_scan_on_first_negative_response: True/False
401                         - retry_if_busy_returncode: True/False
402        :return: True, if current execution needs to be interrupted.
403                 False, if enumerator should proceed with the execution.
404        """
405        if response is None:
406            if cast(bool, kwargs.pop("retry_if_none_received", False)):
407                log_automotive.debug(
408                    "Retry %s because None received", repr(request))
409                return self._populate_retry(state, request)
410            return cast(bool, kwargs.pop("exit_if_no_answer_received", False))
411
412        if self._evaluate_negative_response_code(
413                state, response, **kwargs):
414            # leave current execution, because of a negative response code
415            return True
416
417        if self._evaluate_retry(state, request, response, **kwargs):
418            # leave current execution, because a retry was set
419            return True
420
421        # cleanup retry packet
422        self._retry_pkt[state] = []
423
424        return self._evaluate_ecu_state_modifications(state, request, response)
425
426    def _evaluate_ecu_state_modifications(self,
427                                          state,  # type: EcuState
428                                          request,  # type: Packet
429                                          response,  # type: Packet
430                                          ):  # type: (...) -> bool
431        if EcuState.is_modifier_pkt(response):
432            if state != EcuState.get_modified_ecu_state(
433                    response, request, state):
434                log_automotive.debug(
435                    "Exit execute. Ecu state was modified!")
436                return True
437        return False
438
439    def _evaluate_negative_response_code(self,
440                                         state,  # type: EcuState
441                                         response,  # type: Packet
442                                         **kwargs  # type: Optional[Dict[str, Any]]  # noqa: E501
443                                         ):  # type: (...) -> bool
444        exit_if_service_not_supported = \
445            kwargs.pop("exit_if_service_not_supported", False)
446        exit_scan_on_first_negative_response = \
447            kwargs.pop("exit_scan_on_first_negative_response", False)
448
449        if exit_scan_on_first_negative_response and response.service == 0x7f:
450            return True
451
452        if exit_if_service_not_supported and response.service == 0x7f:
453            response_code = self._get_negative_response_code(response)
454            if response_code in [0x11, 0x7f]:
455                names = {0x11: "serviceNotSupported",
456                         0x7f: "serviceNotSupportedInActiveSession"}
457                log_automotive.debug(
458                    "Exit execute because negative response %s received!",
459                    names[response_code])
460                # execute of current state is completed,
461                # since a serviceNotSupported negative response was received
462                self._state_completed[state] = True
463                # stop current execute and exit
464                return True
465        return False
466
467    def _populate_retry(self,
468                        state,  # type: EcuState
469                        request,  # type: Packet
470                        ):  # type: (...) -> bool
471        """
472        Populates internal storage with request for a retry.
473
474        :param state: Current state
475        :param request: Request which needs a retry
476        :return: True, if storage was populated. If False is returned, the
477                 retry storage is still populated. This indicates that the
478                 current execution was already a retry execution.
479        """
480
481        if not self._get_retry_iterator(state):
482            # This was no retry since the retry_pkt is None
483            self._retry_pkt[state] = request
484            log_automotive.debug(
485                "Exit execute. Retry packet next time!")
486            return True
487        else:
488            # This was a unsuccessful retry, continue execute
489            log_automotive.debug("Unsuccessful retry!")
490            return False
491
492    def _evaluate_retry(self,
493                        state,  # type: EcuState
494                        request,  # type: Packet
495                        response,  # type: Packet
496                        **kwargs  # type: Optional[Dict[str, Any]]
497                        ):  # type: (...) -> bool
498        retry_if_busy_returncode = \
499            kwargs.pop("retry_if_busy_returncode", True)
500
501        if retry_if_busy_returncode and response.service == 0x7f \
502                and self._get_negative_response_code(response) == 0x21:
503            log_automotive.debug(
504                "Retry %s because retry_if_busy_returncode received",
505                repr(request))
506            return self._populate_retry(state, request)
507        return False
508
509    def _compute_statistics(self):
510        # type: () -> List[Tuple[str, str, str]]
511        data_sets = [("all", self._results)]
512
513        for state in self._state_completed.keys():
514            data_sets.append((repr(state),
515                              [r for r in self._results if r.state == state]))
516
517        stats = list()  # type: List[Tuple[str, str, str]]
518
519        for desc, data in data_sets:
520            answered = [cast(_AutomotiveTestCaseFilteredScanResult, r)
521                        for r in data if r.resp is not None and
522                        r.resp_ts is not None]
523            unanswered = [r for r in data if r.resp is None]
524            answertimes = [float(x.resp_ts) - float(x.req_ts)
525                           for x in answered]
526            answertimes_nr = [float(x.resp_ts) - float(x.req_ts)
527                              for x in answered if x.resp.service == 0x7f]
528            answertimes_pr = [float(x.resp_ts) - float(x.req_ts)
529                              for x in answered if x.resp.service != 0x7f]
530
531            nrs = [r.resp for r in answered if r.resp.service == 0x7f]
532            stats.append((desc, "num_answered", str(len(answered))))
533            stats.append((desc, "num_unanswered", str(len(unanswered))))
534            stats.append((desc, "num_negative_resps", str(len(nrs))))
535
536            for postfix, times in zip(
537                    ["", "_nr", "_pr"],
538                    [answertimes, answertimes_nr, answertimes_pr]):
539                try:
540                    ma = str(round(max(times), 5))
541                except ValueError:
542                    ma = "-"
543
544                try:
545                    mi = str(round(min(times), 5))
546                except ValueError:
547                    mi = "-"
548
549                try:
550                    avg = str(round(sum(times) / len(times), 5))
551                except (ValueError, ZeroDivisionError):
552                    avg = "-"
553
554                stats.append((desc, "answertime_min" + postfix, mi))
555                stats.append((desc, "answertime_max" + postfix, ma))
556                stats.append((desc, "answertime_avg" + postfix, avg))
557
558        return stats
559
560    def _show_statistics(self, **kwargs):
561        # type: (Any) -> str
562        stats = self._compute_statistics()
563
564        s = "%d requests were sent, %d answered, %d unanswered" % \
565            (len(self._results),
566             len(self.results_with_response),
567             len(self.results_without_response)) + "\n"
568
569        s += "Statistics per state\n"
570        s += make_lined_table(stats, lambda *x: x, dump=True, sortx=str,
571                              sorty=str) or ""
572
573        return s + "\n"
574
575    def _prepare_negative_response_blacklist(self):
576        # type: () -> None
577        nrc_dict = defaultdict(int)  # type: Dict[int, int]
578        for nr in self.results_with_negative_response:
579            nrc_dict[self._get_negative_response_code(nr.resp)] += 1
580
581        total_nr_count = len(self.results_with_negative_response)
582        for nrc, nr_count in nrc_dict.items():
583            if nrc not in self.negative_response_blacklist and \
584                    nr_count > 30 and (nr_count / total_nr_count) > 0.3:
585                log_automotive.info("Added NRC 0x%02x to filter", nrc)
586                self.negative_response_blacklist.append(nrc)
587
588            if nrc in self.negative_response_blacklist and nr_count < 10:
589                log_automotive.info("Removed NRC 0x%02x to filter", nrc)
590                self.negative_response_blacklist.remove(nrc)
591
592    @property
593    def results(self):
594        # type: () -> List[_AutomotiveTestCaseScanResult]
595        return self._results
596
597    @property
598    def results_with_response(self):
599        # type: () -> List[_AutomotiveTestCaseFilteredScanResult]
600        filtered_results = list()
601        for r in self._results:
602            if r.resp is None:
603                continue
604            if r.resp_ts is None:
605                continue
606            fr = cast(_AutomotiveTestCaseFilteredScanResult, r)
607            filtered_results.append(fr)
608        return filtered_results
609
610    @property
611    def filtered_results(self):
612        # type: () -> List[_AutomotiveTestCaseFilteredScanResult]
613        filtered_results = self.results_with_positive_response
614
615        for r in self.results_with_negative_response:
616            nrc = self._get_negative_response_code(r.resp)
617            if nrc not in self.negative_response_blacklist:
618                filtered_results.append(r)
619        return filtered_results
620
621    @property
622    def scanned_states(self):
623        # type: () -> Set[EcuState]
624        """
625        Helper function to get all sacnned states in results
626        :return: all scanned states
627        """
628        return set([tup.state for tup in self._results])
629
630    @property
631    def results_with_negative_response(self):
632        # type: () -> List[_AutomotiveTestCaseFilteredScanResult]
633        """
634        Helper function to get all results with negative response
635        :return: all results with negative response
636        """
637        return [r for r in self.results_with_response
638                if r.resp and r.resp.service == 0x7f]
639
640    @property
641    def results_with_positive_response(self):
642        # type: () -> List[_AutomotiveTestCaseFilteredScanResult]
643        """
644        Helper function to get all results with positive response
645        :return: all results with positive response
646        """
647        return [r for r in self.results_with_response  # noqa: E501
648                if r.resp and r.resp.service != 0x7f]
649
650    @property
651    def results_without_response(self):
652        # type: () -> List[_AutomotiveTestCaseScanResult]
653        """
654        Helper function to get all results without response
655        :return: all results without response
656        """
657        return [r for r in self._results if r.resp is None]
658
659    def _show_negative_response_details(self, **kwargs):
660        # type: (Any) -> str
661        nrc_dict = defaultdict(int)  # type: Dict[int, int]
662        for nr in self.results_with_negative_response:
663            nrc_dict[self._get_negative_response_code(nr.resp)] += 1
664
665        s = "These negative response codes were received " + \
666            " ".join([hex(c) for c in nrc_dict.keys()]) + "\n"
667        for nrc, nr_count in nrc_dict.items():
668            s += "\tNRC 0x%02x: %s received %d times" % (
669                nrc, self._get_negative_response_desc(nrc), nr_count)
670            s += "\n"
671
672        return s + "\n"
673
674    def _show_negative_response_information(self, **kwargs):
675        # type: (Any) -> str
676        filtered = kwargs.get("filtered", True)
677        s = "%d negative responses were received\n" % \
678            len(self.results_with_negative_response)
679
680        s += "\n"
681
682        s += self._show_negative_response_details(**kwargs) or "" + "\n"
683        if filtered and len(self.negative_response_blacklist):
684            s += "The following negative response codes are blacklisted: %s\n" \
685                 % [self._get_negative_response_desc(nr)
686                    for nr in self.negative_response_blacklist]
687
688        return s + "\n"
689
690    def _show_results_information(self, **kwargs):
691        # type: (Any) -> str
692        def _get_table_entry(
693            *args: Any
694        ):  # type: (...) -> Tuple[str, str, str]
695            tup = cast(_AutomotiveTestCaseScanResult, args)
696            return self._get_table_entry_x(tup), \
697                self._get_table_entry_y(tup), \
698                self._get_table_entry_z(tup)
699
700        filtered = kwargs.get("filtered", True)
701        s = "=== No data to display ===\n"
702        data = self._results if not filtered else self.filtered_results  # type: Union[List[_AutomotiveTestCaseScanResult], List[_AutomotiveTestCaseFilteredScanResult]]  # noqa: E501
703        if len(data):
704            s = make_lined_table(
705                data, _get_table_entry, dump=True, sortx=str) or ""
706
707        return s + "\n"
708
709    def show(self, dump=False, filtered=True, verbose=False):
710        # type: (bool, bool, bool) -> Optional[str]
711        if filtered:
712            self._prepare_negative_response_blacklist()
713
714        show_functions = [self._show_header,
715                          self._show_statistics,
716                          self._show_negative_response_information,
717                          self._show_results_information]
718
719        if verbose:
720            show_functions.append(self._show_state_information)
721
722        s = "\n".join(x(filtered=filtered) for x in show_functions)
723
724        if dump:
725            return s + "\n"
726        else:
727            print(s)
728            return None
729
730    def _get_label(self, response, positive_case="PR: PositiveResponse"):
731        # type: (Optional[Packet], Union[Callable[[Packet], str], str]) -> str
732        if response is None:
733            return "Timeout"
734        elif orb(bytes(response)[0]) == 0x7f:
735            return self._get_negative_response_label(response)
736        else:
737            if isinstance(positive_case, str):
738                return positive_case
739            elif callable(positive_case):
740                return positive_case(response)
741            else:
742                raise Scapy_Exception("Unsupported Type for positive_case. "
743                                      "Provide a string or a function.")
744
745    @property
746    def supported_responses(self):
747        # type: () -> List[EcuResponse]
748        supported_resps = list()
749        all_responses = [p for p in self._result_packets.values()
750                         if orb(bytes(p)[0]) & 0x40]
751        for resp in all_responses:
752            states = list(set([t.state for t in self.results_with_response
753                               if t.resp == resp]))
754            supported_resps.append(EcuResponse(state=states, responses=resp))
755        return supported_resps
756
757
758class StateGeneratingServiceEnumerator(
759    ServiceEnumerator,
760    StateGenerator,
761    metaclass=abc.ABCMeta
762):
763    def __init__(self):
764        # type: () -> None
765        super(StateGeneratingServiceEnumerator, self).__init__()
766
767        # Internal storage of request packets for a certain Edge. If an edge
768        # is found during the evaluation of the last result of the
769        # ServiceEnumerator, the according request of the result tuple is
770        # stored together with the new Edge.
771        self._edge_requests = dict()  # type: Dict[_Edge, Packet]
772
773    def get_new_edge(self,
774                     socket,  # type: _SocketUnion
775                     config  # type: AutomotiveTestCaseExecutorConfiguration
776                     ):
777        # type: (...) -> Optional[_Edge]
778        """
779        Basic identification of a new edge. The last response is evaluated.
780        If this response packet can modify the state of an Ecu, this new
781        state is returned, otherwise None.
782
783        :param socket: Socket to the DUT (unused)
784        :param config: Global configuration of the executor (unused)
785        :return: tuple of old EcuState and new EcuState, or None
786        """
787        try:
788            state, req, resp, _, _ = cast(ServiceEnumerator, self).results[-1]
789        except IndexError:
790            return None
791
792        if resp is not None and EcuState.is_modifier_pkt(resp):
793            new_state = EcuState.get_modified_ecu_state(resp, req, state)
794            if new_state == state:
795                return None
796            else:
797                edge = (state, new_state)
798                self._edge_requests[edge] = req
799                return edge
800        else:
801            return None
802
803    @staticmethod
804    def transition_function(
805            sock,  # type: _SocketUnion
806            config,  # type: AutomotiveTestCaseExecutorConfiguration
807            kwargs  # type: Dict[str, Any]
808    ):
809        # type: (...) -> bool
810        """
811        Very basic transition function. This function sends a given request
812        in kwargs and evaluates the response.
813
814        :param sock: Connection to the DUT
815        :param config: Global configuration of the executor (unused)
816        :param kwargs: Dictionary with arguments. This function only uses
817                       the argument *"req"* which must contain a Packet,
818                       causing an EcuState transition of the DUT.
819        :return: True in case of a successful transition, else False
820        """
821        req = kwargs.get("req", None)
822        if req is None:
823            return False
824
825        try:
826            res = sock.sr1(req, timeout=20, verbose=False, chainEX=True)
827            return res is not None and res.service != 0x7f
828        except (OSError, ValueError, Scapy_Exception) as e:
829            log_automotive.exception(
830                "Exception in transition function: %s", e)
831            return False
832
833    def get_transition_function_description(self, edge):
834        # type: (_Edge) -> str
835        return repr(self._edge_requests[edge])
836
837    def get_transition_function_kwargs(self, edge):
838        # type: (_Edge) -> Dict[str, Any]
839        req = self._edge_requests[edge]
840        kwargs = {
841            "desc": self.get_transition_function_description(edge),
842            "req": req
843        }
844        return kwargs
845
846    def get_transition_function(self, socket, edge):
847        # type: (_SocketUnion, _Edge) -> Optional[_TransitionTuple]
848        try:
849            return self.transition_function, \
850                self.get_transition_function_kwargs(edge), None
851        except KeyError:
852            return None
853