• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Nils Weiss <nils@we155.de>
5
6# scapy.contrib.description = TestCase base class definitions
7# scapy.contrib.status = library
8
9
10import abc
11from collections import defaultdict
12
13from scapy.utils import make_lined_table, SingleConversationSocket
14from scapy.supersocket import SuperSocket
15from scapy.contrib.automotive.scanner.graph import _Edge
16from scapy.contrib.automotive.ecu import EcuState, EcuResponse
17from scapy.error import Scapy_Exception
18
19
20# Typing imports
21from typing import (
22    Any,
23    Union,
24    List,
25    Optional,
26    Dict,
27    Tuple,
28    Set,
29    Callable,
30    TYPE_CHECKING,
31)
32if TYPE_CHECKING:
33    from scapy.contrib.automotive.scanner.configuration import AutomotiveTestCaseExecutorConfiguration  # noqa: E501
34
35
36# type definitions
37_SocketUnion = Union[SuperSocket, SingleConversationSocket]
38_TransitionCallable = Callable[[_SocketUnion, "AutomotiveTestCaseExecutorConfiguration", Dict[str, Any]], bool]  # noqa: E501
39_CleanupCallable = Callable[[_SocketUnion, "AutomotiveTestCaseExecutorConfiguration"], bool]  # noqa: E501
40_TransitionTuple = Tuple[_TransitionCallable, Dict[str, Any], Optional[_CleanupCallable]]  # noqa: E501
41
42
43class AutomotiveTestCaseABC(metaclass=abc.ABCMeta):
44    """
45    Base class for "TestCase" objects. In automotive scanners, these TestCase
46    objects are used for individual tasks, for example enumerating over one
47    kind of functionality of the protocol. It is also possible, that
48    these TestCase objects execute complex tests on an ECU.
49    The TestCaseExecuter object has a list of TestCases. The executer
50    manipulates a device under test (DUT), to enter a certain state. In this
51    state, the TestCase object gets executed.
52    """
53
54    _supported_kwargs = {}  # type: Dict[str, Tuple[Any, Optional[Callable[[Any], bool]]]]  # noqa: E501
55    _supported_kwargs_doc = ""
56
57    @abc.abstractmethod
58    def has_completed(self, state):
59        # type: (EcuState) -> bool
60        """
61        Tells if this TestCase was executed for a certain state
62        :param state: State of interest
63        :return: True, if TestCase was executed in the questioned state
64        """
65        raise NotImplementedError()
66
67    @abc.abstractmethod
68    def pre_execute(self,
69                    socket,  # type: _SocketUnion
70                    state,  # type: EcuState
71                    global_configuration  # type: AutomotiveTestCaseExecutorConfiguration  # noqa: E501
72                    ):  # type: (...) -> None
73        """
74        Will be executed previously to ``execute``. This function can be used
75        to manipulate the configuration passed to execute.
76
77        :param socket: Socket object with the connection to a DUT
78        :param state: Current state of the DUT
79        :param global_configuration: Configuration of the TestCaseExecutor
80        """
81        raise NotImplementedError()
82
83    @abc.abstractmethod
84    def execute(self, socket, state, **kwargs):
85        # type: (_SocketUnion, EcuState, Any) -> None
86        """
87        Executes this TestCase for a given state
88
89        :param socket: Socket object with the connection to a DUT
90        :param state: Current state of the DUT
91        :param kwargs: Local configuration of the TestCasesExecutor
92        :return:
93        """
94        raise NotImplementedError()
95
96    @abc.abstractmethod
97    def post_execute(self,
98                     socket,  # type: _SocketUnion
99                     state,  # type: EcuState
100                     global_configuration  # type: AutomotiveTestCaseExecutorConfiguration  # noqa: E501
101                     ):  # type: (...) -> None
102        """
103        Will be executed subsequently to ``execute``. This function can be used
104        for additional evaluations after the ``execute``.
105
106        :param socket: Socket object with the connection to a DUT
107        :param state: Current state of the DUT
108        :param global_configuration: Configuration of the TestCaseExecutor
109        """
110        raise NotImplementedError()
111
112    @abc.abstractmethod
113    def show(self, dump=False, filtered=True, verbose=False):
114        # type: (bool, bool, bool) -> Optional[str]
115        """
116        Shows results of TestCase
117
118        :param dump: If True, the results will be returned; If False, the
119                     results will be printed.
120        :param filtered: If True, the negative responses will be filtered
121                         dynamically.
122        :param verbose: If True, additional information will be provided.
123        :return: test results of TestCase if parameter ``dump`` is True,
124                 else ``None``
125        """
126        raise NotImplementedError()
127
128    @property
129    @abc.abstractmethod
130    def completed(self):
131        # type: () -> bool
132        """
133        Tells if this TestCase is completely executed
134        :return: True, if TestCase is completely executed
135        """
136        raise NotImplementedError
137
138    @property
139    @abc.abstractmethod
140    def supported_responses(self):
141        # type: () -> List[EcuResponse]
142        """
143        Tells the supported responses in TestCase
144        :return: The list of supported responses
145        """
146        raise NotImplementedError
147
148
149class AutomotiveTestCase(AutomotiveTestCaseABC):
150    """ Base class for TestCases"""
151
152    _description = "AutomotiveTestCase"
153    _supported_kwargs = AutomotiveTestCaseABC._supported_kwargs
154    _supported_kwargs_doc = AutomotiveTestCaseABC._supported_kwargs_doc
155
156    def __init__(self):
157        # type: () -> None
158        self._state_completed = defaultdict(bool)  # type: Dict[EcuState, bool]
159
160    def has_completed(self, state):
161        # type: (EcuState) -> bool
162        return self._state_completed[state]
163
164    @classmethod
165    def check_kwargs(cls, kwargs):
166        # type: (Dict[str, Any]) -> None
167        for k, v in kwargs.items():
168            if k not in cls._supported_kwargs.keys():
169                raise Scapy_Exception(
170                    "Keyword-Argument %s not supported for %s" %
171                    (k, cls.__name__))
172            ti, vf = cls._supported_kwargs[k]
173            if ti is not None and not isinstance(v, ti):
174                raise Scapy_Exception(
175                    "Keyword-Value '%s' is not instance of type %s" %
176                    (k, str(ti)))
177            if vf is not None and not vf(v):
178                raise Scapy_Exception(
179                    "Validation Error: '%s: %s' is not in the allowed "
180                    "value range" % (k, str(v))
181                )
182
183    @property
184    def completed(self):
185        # type: () -> bool
186        return all(v for _, v in self._state_completed.items())
187
188    @property
189    def scanned_states(self):
190        # type: () -> Set[EcuState]
191        """
192        Helper function to get all scanned states
193        :return: all scanned states
194        """
195        return set(self._state_completed.keys())
196
197    def pre_execute(self, socket, state, global_configuration):
198        # type: (_SocketUnion, EcuState, AutomotiveTestCaseExecutorConfiguration) -> None  # noqa: E501
199        pass
200
201    def execute(self, socket, state, **kwargs):
202        # type: (_SocketUnion, EcuState, Any) -> None
203        raise NotImplementedError()
204
205    def post_execute(self, socket, state, global_configuration):
206        # type: (_SocketUnion, EcuState, AutomotiveTestCaseExecutorConfiguration) -> None  # noqa: E501
207        pass
208
209    def _show_header(self, **kwargs):
210        # type: (Any) -> str
211        s = "\n\n" + "=" * (len(self._description) + 10) + "\n"
212        s += " " * 5 + self._description + "\n"
213        s += "-" * (len(self._description) + 10) + "\n"
214
215        return s + "\n"
216
217    def _show_state_information(self, **kwargs):
218        # type: (Any) -> str
219        completed = [(state, self._state_completed[state])
220                     for state in self.scanned_states]
221        return make_lined_table(
222            completed, lambda x, y: ("Scan state completed", x, y),
223            dump=True) or ""
224
225    def show(self, dump=False, filtered=True, verbose=False):
226        # type: (bool, bool, bool) -> Optional[str]
227
228        s = self._show_header()
229
230        if verbose:
231            s += self._show_state_information()
232
233        if dump:
234            return s + "\n"
235        else:
236            print(s)
237            return None
238
239
240class TestCaseGenerator(metaclass=abc.ABCMeta):
241    @abc.abstractmethod
242    def get_generated_test_case(self):
243        # type: () -> Optional[AutomotiveTestCaseABC]
244        raise NotImplementedError()
245
246
247class StateGenerator(metaclass=abc.ABCMeta):
248
249    @abc.abstractmethod
250    def get_new_edge(self, socket, config):
251        # type: (_SocketUnion, AutomotiveTestCaseExecutorConfiguration) -> Optional[_Edge]  # noqa: E501
252        raise NotImplementedError
253
254    @abc.abstractmethod
255    def get_transition_function(self, socket, edge):
256        # type: (_SocketUnion, _Edge) -> Optional[_TransitionTuple]
257        """
258
259        :param socket: Socket to target
260        :param edge: Tuple of EcuState objects for the requested
261                     transition function
262        :return: Returns an optional tuple consisting of a transition function,
263                 a keyword arguments dictionary for the transition function
264                 and a cleanup function. Both functions
265                 take a Socket and the TestCaseExecutor configuration as
266                 arguments and return True if the execution was successful.
267                 The first function is the state enter function, the second
268                 function is a cleanup function
269        """
270        raise NotImplementedError
271