• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Nils Weiss <nils@we155.de>
5
6# scapy.contrib.description = Staged AutomotiveTestCase base classes
7# scapy.contrib.status = library
8
9
10from scapy.contrib.automotive import log_automotive
11from scapy.contrib.automotive.scanner.graph import _Edge
12from scapy.contrib.automotive.ecu import EcuState, EcuResponse, Ecu
13from scapy.contrib.automotive.scanner.test_case import AutomotiveTestCaseABC, \
14    TestCaseGenerator, StateGenerator, _SocketUnion
15
16# Typing imports
17from typing import (
18    Any,
19    List,
20    Optional,
21    Dict,
22    Callable,
23    cast,
24    Tuple,
25    TYPE_CHECKING,
26)
27if TYPE_CHECKING:
28    from scapy.contrib.automotive.scanner.test_case import _TransitionTuple
29    from scapy.contrib.automotive.scanner.configuration import \
30        AutomotiveTestCaseExecutorConfiguration
31
32# type definitions
33_TestCaseConnectorCallable = \
34    Callable[[AutomotiveTestCaseABC, AutomotiveTestCaseABC], Dict[str, Any]]
35
36
37class StagedAutomotiveTestCase(AutomotiveTestCaseABC, TestCaseGenerator, StateGenerator):  # noqa: E501
38    """ Helper object to build a pipeline of TestCases. This allows to combine
39    TestCases and to execute them after each other. Custom connector functions
40    can be used to exchange and manipulate the configuration of a subsequent
41    TestCase.
42
43    :param test_cases: A list of objects following the AutomotiveTestCaseABC
44        interface
45    :param connectors: A list of connector functions. A connector function
46        takes two TestCase objects and returns a dictionary which is provided
47        to the second TestCase as kwargs of the execute function.
48
49
50    Example:
51        >>> class MyTestCase2(AutomotiveTestCaseABC):
52        >>>     pass
53        >>>
54        >>> class MyTestCase1(AutomotiveTestCaseABC):
55        >>>     pass
56        >>>
57        >>> def connector(testcase1, testcase2):
58        >>>     scan_range = len(testcase1.results)
59        >>>     return {"verbose": True, "scan_range": scan_range}
60        >>>
61        >>> tc1 = MyTestCase1()
62        >>> tc2 = MyTestCase2()
63        >>> pipeline = StagedAutomotiveTestCase([tc1, tc2], [None, connector])
64    """
65
66    # Delay the increment of a stage after the current stage is finished
67    # has_completed() has to be called five times in order to increment the
68    # current stage. This ensures, that the current stage is executed for
69    # all possible states of the DUT, and no state is missed for the first
70    # TestCase.
71    __delay_stages = 5
72
73    def __init__(self,
74                 test_cases,  # type: List[AutomotiveTestCaseABC]
75                 connectors=None  # type: Optional[List[Optional[_TestCaseConnectorCallable]]]  # noqa: E501
76                 ):  # type: (...) -> None
77        super(StagedAutomotiveTestCase, self).__init__()
78        self.__test_cases = test_cases
79        self.__connectors = connectors
80        self.__stage_index = 0
81        self.__completion_delay = 0
82        self.__current_kwargs = None  # type: Optional[Dict[str, Any]]
83
84    def __getitem__(self, item):
85        # type: (int) -> AutomotiveTestCaseABC
86        return self.__test_cases[item]
87
88    def __len__(self):
89        # type: () -> int
90        return len(self.__test_cases)
91
92    # TODO: Fix unit tests and remove this function
93    def __reduce__(self):  # type: ignore
94        f, t, d = super(StagedAutomotiveTestCase, self).__reduce__()  # type: ignore  # noqa: E501
95        try:
96            del d["_StagedAutomotiveTestCase__connectors"]
97        except KeyError:
98            pass
99        return f, t, d
100
101    @property
102    def test_cases(self):
103        # type: () -> List[AutomotiveTestCaseABC]
104        return self.__test_cases
105
106    @property
107    def current_test_case(self):
108        # type: () -> AutomotiveTestCaseABC
109        return self[self.__stage_index]
110
111    @property
112    def current_connector(self):
113        # type: () -> Optional[_TestCaseConnectorCallable]
114        if not self.__connectors:
115            return None
116        else:
117            return self.__connectors[self.__stage_index]
118
119    @property
120    def previous_test_case(self):
121        # type: () -> Optional[AutomotiveTestCaseABC]
122        return self.__test_cases[self.__stage_index - 1] if \
123            self.__stage_index > 0 else None
124
125    def get_generated_test_case(self):
126        # type: () -> Optional[AutomotiveTestCaseABC]
127        try:
128            test_case = cast(TestCaseGenerator, self.current_test_case)
129            return test_case.get_generated_test_case()
130        except AttributeError:
131            return None
132
133    def get_new_edge(self,
134                     socket,  # type: _SocketUnion
135                     config  # type: AutomotiveTestCaseExecutorConfiguration
136                     ):  # type: (...) -> Optional[_Edge]
137        try:
138            test_case = cast(StateGenerator, self.current_test_case)
139            return test_case.get_new_edge(socket, config)
140        except AttributeError:
141            return None
142
143    def get_transition_function(self, socket, edge):
144        # type: (_SocketUnion, _Edge) -> Optional[_TransitionTuple]
145        try:
146            test_case = cast(StateGenerator, self.current_test_case)
147            return test_case.get_transition_function(socket, edge)
148        except AttributeError:
149            return None
150
151    def has_completed(self, state):
152        # type: (EcuState) -> bool
153        if not (self.current_test_case.has_completed(state) and
154                self.current_test_case.completed):
155            # current test_case not fully completed
156            # reset completion delay, since new states could have been appeared
157            self.__completion_delay = 0
158            return False
159
160        # current stage is finished. We have to increase the stage
161        if self.__completion_delay < StagedAutomotiveTestCase.__delay_stages:
162            # First we wait five more iteration of the executor
163            # Maybe one more execution reveals new states of other
164            # test_cases
165            self.__completion_delay += 1
166            return False
167
168        # current test_case is fully completed
169        elif self.__stage_index == len(self.__test_cases) - 1:
170            # this test_case was the last test_case... nothing to do
171            return True
172
173        else:
174            # We waited more iterations and no new state appeared,
175            # let's enter the next stage
176            log_automotive.info(
177                "Staged AutomotiveTestCase %s completed",
178                self.current_test_case.__class__.__name__)
179            self.__stage_index += 1
180            self.__completion_delay = 0
181        return False
182
183    def pre_execute(self,
184                    socket,  # type: _SocketUnion
185                    state,  # type: EcuState
186                    global_configuration  # type: AutomotiveTestCaseExecutorConfiguration  # noqa: E501
187                    ):  # type: (...) -> None
188        test_case_cls = self.current_test_case.__class__
189        try:
190            self.__current_kwargs = global_configuration[
191                test_case_cls.__name__]
192        except KeyError:
193            self.__current_kwargs = dict()
194            global_configuration[test_case_cls.__name__] = \
195                self.__current_kwargs
196
197        if callable(self.current_connector) and self.__stage_index > 0:
198            if self.previous_test_case:
199                con = self.current_connector  # type: _TestCaseConnectorCallable  # noqa: E501
200                con_kwargs = con(self.previous_test_case,
201                                 self.current_test_case)
202                if self.__current_kwargs is not None and con_kwargs is not None:  # noqa: E501
203                    self.__current_kwargs.update(con_kwargs)
204
205        log_automotive.debug("Stage AutomotiveTestCase %s kwargs: %s",
206                             self.current_test_case.__class__.__name__,
207                             self.__current_kwargs)
208
209        self.current_test_case.pre_execute(socket, state, global_configuration)
210
211    def execute(self, socket, state, **kwargs):
212        # type: (_SocketUnion, EcuState, Any) -> None
213        kwargs.update(self.__current_kwargs or dict())
214        self.current_test_case.execute(socket, state, **kwargs)
215
216    def post_execute(self,
217                     socket,  # type: _SocketUnion
218                     state,  # type: EcuState
219                     global_configuration  # type: AutomotiveTestCaseExecutorConfiguration  # noqa: E501
220                     ):  # type: (...) -> None
221        self.current_test_case.post_execute(
222            socket, state, global_configuration)
223
224    @staticmethod
225    def _show_headline(headline, sep="="):
226        # type: (str, str) -> str
227        s = "\n\n" + sep * (len(headline) + 10) + "\n"
228        s += " " * 5 + headline + "\n"
229        s += sep * (len(headline) + 10) + "\n"
230        return s + "\n"
231
232    def show(self, dump=False, filtered=True, verbose=False):
233        # type: (bool, bool, bool) -> Optional[str]
234        s = self._show_headline("AutomotiveTestCase Pipeline", "=")
235        for idx, t in enumerate(self.__test_cases):
236            s += self._show_headline(
237                "AutomotiveTestCase Stage %d" % idx, "-")
238            s += t.show(True, filtered, verbose) or ""
239
240        if dump:
241            return s + "\n"
242        else:
243            print(s)
244            return None
245
246    @property
247    def completed(self):
248        # type: () -> bool
249        return all(e.completed for e in self.__test_cases) and \
250            self.__completion_delay >= StagedAutomotiveTestCase.__delay_stages
251
252    @property
253    def supported_responses(self):
254        # type: () -> List[EcuResponse]
255        supported_responses = list()
256        for tc in self.test_cases:
257            supported_responses += tc.supported_responses
258
259        supported_responses.sort(key=Ecu.sort_key_func)
260        return supported_responses
261
262    def runtime_estimation(self):
263        # type: () -> Optional[Tuple[int, int, float]]
264
265        if hasattr(self.current_test_case, "runtime_estimation"):
266            cur_est = self.current_test_case.runtime_estimation()
267            if cur_est:
268                return len(self.test_cases), \
269                    self.__stage_index, \
270                    float(self.__stage_index) / len(self.test_cases) + \
271                    cur_est[2] / len(self.test_cases)
272
273        return len(self.test_cases), \
274            self.__stage_index, \
275            float(self.__stage_index) / len(self.test_cases)
276