• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Nils Weiss <nils@we155.de>
5
6# scapy.contrib.description = AutomotiveTestCaseExecutor base class
7# scapy.contrib.status = library
8
9import abc
10import time
11
12from itertools import product
13
14from scapy.contrib.automotive import log_automotive
15from scapy.contrib.automotive.scanner.graph import Graph
16from scapy.error import Scapy_Exception
17from scapy.supersocket import SuperSocket
18from scapy.utils import make_lined_table, SingleConversationSocket
19from scapy.contrib.automotive.ecu import EcuState, EcuResponse, Ecu
20from scapy.contrib.automotive.scanner.configuration import \
21    AutomotiveTestCaseExecutorConfiguration
22from scapy.contrib.automotive.scanner.test_case import AutomotiveTestCaseABC, \
23    _SocketUnion, _CleanupCallable, StateGenerator, TestCaseGenerator, \
24    AutomotiveTestCase
25
26# Typing imports
27from typing import (
28    Any,
29    Union,
30    List,
31    Optional,
32    Dict,
33    Callable,
34    Type,
35    cast,
36    TypeVar,
37)
38
39T = TypeVar("T")
40
41
42class AutomotiveTestCaseExecutor(metaclass=abc.ABCMeta):
43    """
44    Base class for different automotive scanners. This class handles
45    the connection to a scan target, ensures the execution of all it's
46    test cases, and stores the system state machine
47
48
49    :param socket: A socket object to communicate with the scan target
50    :param reset_handler: A function to reset the scan target
51    :param reconnect_handler: In case the communication needs to be
52                              established after a reset, provide a
53                              reconnect function which returns a socket object
54    :param test_cases: A list of TestCase instances or classes
55    :param kwargs: Arguments for the internal
56                   AutomotiveTestCaseExecutorConfiguration instance
57    """
58
59    @property
60    def _initial_ecu_state(self):
61        # type: () -> EcuState
62        return EcuState(session=1)
63
64    def __init__(
65            self,
66            socket,  # type: Optional[_SocketUnion]
67            reset_handler=None,  # type: Optional[Callable[[], None]]
68            reconnect_handler=None,  # type: Optional[Callable[[], _SocketUnion]]  # noqa: E501
69            test_cases=None,
70            # type: Optional[List[Union[AutomotiveTestCaseABC, Type[AutomotiveTestCaseABC]]]]  # noqa: E501
71            **kwargs  # type: Optional[Dict[str, Any]]
72    ):  # type: (...) -> None
73
74        # The TesterPresentSender can interfere with a test_case, since a
75        # target may only allow one request at a time.
76        # The SingleConversationSocket prevents interleaving requests.
77        if socket and not isinstance(socket, SingleConversationSocket):
78            self.socket = SingleConversationSocket(socket)  # type: Optional[_SocketUnion]  # noqa: E501
79        else:
80            self.socket = socket
81
82        self.target_state = self._initial_ecu_state
83        self.reset_handler = reset_handler
84        self.reconnect_handler = reconnect_handler
85
86        self.cleanup_functions = list()  # type: List[_CleanupCallable]
87
88        self.configuration = AutomotiveTestCaseExecutorConfiguration(
89            test_cases or self.default_test_case_clss, **kwargs)
90        self.validate_test_case_kwargs()
91
92    def __reduce__(self):  # type: ignore
93        f, t, d = super(AutomotiveTestCaseExecutor, self).__reduce__()  # type: ignore  # noqa: E501
94        try:
95            del d["socket"]
96        except KeyError:
97            pass
98        try:
99            del d["reset_handler"]
100        except KeyError:
101            pass
102        try:
103            del d["reconnect_handler"]
104        except KeyError:
105            pass
106        return f, t, d
107
108    @property
109    @abc.abstractmethod
110    def default_test_case_clss(self):
111        # type: () -> List[Type[AutomotiveTestCaseABC]]
112        raise NotImplementedError()
113
114    @property
115    def state_graph(self):
116        # type: () -> Graph
117        return self.configuration.state_graph
118
119    @property
120    def state_paths(self):
121        # type: () -> List[List[EcuState]]
122        """
123        Returns all state paths. A path is represented by a list of EcuState
124        objects.
125        :return: A list of paths.
126        """
127        paths = [Graph.dijkstra(self.state_graph, self._initial_ecu_state, s)
128                 for s in self.state_graph.nodes
129                 if s != self._initial_ecu_state]
130        return sorted(
131            [p for p in paths if p] + [[self._initial_ecu_state]],
132            key=lambda x: x[-1])
133
134    @property
135    def final_states(self):
136        # type: () -> List[EcuState]
137        """
138        Returns a list with all final states. A final state is the last
139        state of a path.
140        :return:
141        """
142        return [p[-1] for p in self.state_paths]
143
144    @property
145    def scan_completed(self):
146        # type: () -> bool
147        return all(t.has_completed(s) for t, s in
148                   product(self.configuration.test_cases, self.final_states))
149
150    def reset_target(self):
151        # type: () -> None
152        log_automotive.info("Target reset")
153        if self.reset_handler:
154            self.reset_handler()
155        self.target_state = self._initial_ecu_state
156
157    def reconnect(self):
158        # type: () -> None
159        if self.reconnect_handler:
160            try:
161                if self.socket:
162                    self.socket.close()
163            except Exception as e:
164                log_automotive.exception(
165                    "Exception '%s' during socket.close", e)
166
167            log_automotive.info("Target reconnect")
168            socket = self.reconnect_handler()
169            if not isinstance(socket, SingleConversationSocket):
170                self.socket = SingleConversationSocket(socket)
171            else:
172                self.socket = socket
173
174        if self.socket and self.socket.closed:
175            raise Scapy_Exception(
176                "Socket closed even after reconnect. Stop scan!")
177
178    def execute_test_case(self, test_case, kill_time=None):
179        # type: (AutomotiveTestCaseABC, Optional[float]) -> None
180        """
181        This function ensures the correct execution of a testcase, including
182        the pre_execute, execute and post_execute.
183        Finally, the testcase is asked if a new edge or a new testcase was
184        generated.
185
186        :param test_case: A test case to be executed
187        :param kill_time: If set, this defines the maximum execution time for
188                          the current test_case
189        :return: None
190        """
191
192        if not self.socket:
193            log_automotive.warning("Socket is None! Leaving execute_test_case")
194            return
195
196        test_case.pre_execute(
197            self.socket, self.target_state, self.configuration)
198
199        try:
200            test_case_kwargs = self.configuration[test_case.__class__.__name__]
201        except KeyError:
202            test_case_kwargs = dict()
203
204        if kill_time:
205            max_execution_time = max(int(kill_time - time.monotonic()), 5)
206            cur_execution_time = test_case_kwargs.get("execution_time", 1200)
207            test_case_kwargs["execution_time"] = min(max_execution_time,
208                                                     cur_execution_time)
209
210        log_automotive.debug("Execute test_case %s with args %s",
211                             test_case.__class__.__name__, test_case_kwargs)
212
213        test_case.execute(self.socket, self.target_state, **test_case_kwargs)
214        test_case.post_execute(
215            self.socket, self.target_state, self.configuration)
216
217        self.check_new_states(test_case)
218        self.check_new_testcases(test_case)
219
220        if hasattr(test_case, "runtime_estimation"):
221            estimation = test_case.runtime_estimation()
222            if estimation is not None:
223                log_automotive.debug(
224                    "[i] Test_case %s: TODO %d, "
225                    "DONE %d, TOTAL %0.2f",
226                    test_case.__class__.__name__, estimation[0],
227                    estimation[1], estimation[2])
228
229    def check_new_testcases(self, test_case):
230        # type: (AutomotiveTestCaseABC) -> None
231        if isinstance(test_case, TestCaseGenerator):
232            new_test_case = test_case.get_generated_test_case()
233            if new_test_case:
234                log_automotive.debug("Testcase generated %s", new_test_case)
235                self.configuration.add_test_case(new_test_case)
236
237    def check_new_states(self, test_case):
238        # type: (AutomotiveTestCaseABC) -> None
239        if not self.socket:
240            log_automotive.warning("Socket is None! Leaving check_new_states")
241            return
242
243        if isinstance(test_case, StateGenerator):
244            edge = test_case.get_new_edge(self.socket, self.configuration)
245            if edge:
246                log_automotive.debug("Edge found %s", edge)
247                tf = test_case.get_transition_function(self.socket, edge)
248                self.state_graph.add_edge(edge, tf)
249
250    def validate_test_case_kwargs(self):
251        # type: () -> None
252        for test_case in self.configuration.test_cases:
253            if isinstance(test_case, AutomotiveTestCase):
254                test_case_kwargs = self.configuration[test_case.__class__.__name__]
255                test_case.check_kwargs(test_case_kwargs)
256
257    def stop_scan(self):
258        # type: () -> None
259        self.configuration.stop_event.set()
260        log_automotive.debug("Internal stop event set!")
261
262    def progress(self):
263        # type: () -> float
264        progress = []
265        for tc in self.configuration.test_cases:
266            if not hasattr(tc, "runtime_estimation"):
267                continue
268            est = tc.runtime_estimation()
269            if est is None:
270                continue
271            progress.append(est[2])
272
273        return sum(progress) / len(progress) if len(progress) else 0.0
274
275    def scan(self, timeout=None):
276        # type: (Optional[int]) -> None
277        """
278        Executes all testcases for a given time.
279        :param timeout: Time for execution.
280        :return: None
281        """
282        self.configuration.stop_event.clear()
283        if timeout is None:
284            kill_time = None
285        else:
286            kill_time = time.monotonic() + timeout
287        while kill_time is None or kill_time > time.monotonic():
288            test_case_executed = False
289            log_automotive.info("[i] Scan progress %0.2f", self.progress())
290            log_automotive.debug("[i] Scan paths %s", self.state_paths)
291            for p, test_case in product(
292                    self.state_paths, self.configuration.test_cases):
293                log_automotive.info("Scan path %s", p)
294                terminate = kill_time and kill_time <= time.monotonic()
295                if terminate or self.configuration.stop_event.is_set():
296                    log_automotive.debug(
297                        "Execution time exceeded. Terminating scan!")
298                    break
299
300                final_state = p[-1]
301                if test_case.has_completed(final_state):
302                    log_automotive.debug("State %s for %s completed",
303                                         repr(final_state), test_case)
304                    continue
305
306                try:
307                    if not self.enter_state_path(p):
308                        log_automotive.error(
309                            "Error entering path %s", p)
310                        continue
311                    log_automotive.info(
312                        "Execute %s for path %s", str(test_case), p)
313                    self.execute_test_case(test_case, kill_time)
314                    test_case_executed = True
315                except (OSError, ValueError, Scapy_Exception) as e:
316                    log_automotive.exception("Exception: %s", e)
317                    if self.configuration.debug:
318                        raise e
319                    if isinstance(e, OSError):
320                        log_automotive.exception(
321                            "OSError occurred, closing socket")
322                        if self.socket:
323                            self.socket.close()
324                    if (self.socket
325                            and cast(SuperSocket, self.socket).closed
326                            and self.reconnect_handler is None):
327                        log_automotive.critical(
328                            "Socket went down. Need to leave scan")
329                        raise e
330                finally:
331                    self.cleanup_state()
332
333            if not test_case_executed:
334                log_automotive.info(
335                    "Execute failure or scan completed. Exit scan!")
336                break
337
338        self.cleanup_state()
339        self.reset_target()
340
341    def enter_state_path(self, path):
342        # type: (List[EcuState]) -> bool
343        """
344        Resets and reconnects to a target and applies all transition functions
345        to traversal a given path.
346        :param path: Path to be applied to the scan target.
347        :return: True, if all transition functions could be executed.
348        """
349        if path[0] != self._initial_ecu_state:
350            raise Scapy_Exception(
351                "Initial state of path not equal reset state of the target")
352
353        self.reset_target()
354        self.reconnect()
355
356        if len(path) == 1:
357            return True
358
359        for next_state in path[1:]:
360            if self.configuration.stop_event.is_set():
361                self.cleanup_state()
362                return False
363
364            edge = (self.target_state, next_state)
365            self.configuration.stop_event.wait(
366                timeout=self.configuration.delay_enter_state)
367            if not self.enter_state(*edge):
368                self.state_graph.downrate_edge(edge)
369                self.cleanup_state()
370                return False
371        return True
372
373    def enter_state(self, prev_state, next_state):
374        # type: (EcuState, EcuState) -> bool
375        """
376        Obtains a transition function from the system state graph and executes
377        it. On success, the cleanup function is added for a later cleanup of
378        the new state.
379        :param prev_state: Current state
380        :param next_state: Desired state
381        :return: True, if state could be changed successful
382        """
383        if not self.socket:
384            log_automotive.warning("Socket is None! Leaving enter_state")
385            return False
386
387        edge = (prev_state, next_state)
388        funcs = self.state_graph.get_transition_tuple_for_edge(edge)
389
390        if funcs is None:
391            log_automotive.error("No transition function for %s", edge)
392            return False
393
394        trans_func, trans_kwargs, clean_func = funcs
395        state_changed = trans_func(
396            self.socket, self.configuration, trans_kwargs)
397        if state_changed:
398            self.target_state = next_state
399
400            if clean_func is not None:
401                self.cleanup_functions += [clean_func]
402            return True
403        else:
404            log_automotive.info("Transition for edge %s failed", edge)
405            return False
406
407    def cleanup_state(self):
408        # type: () -> None
409        """
410        Executes all collected cleanup functions from a traversed path
411        :return: None
412        """
413        if not self.socket:
414            log_automotive.warning("Socket is None! Leaving cleanup_state")
415            return
416
417        for f in self.cleanup_functions:
418            if not callable(f):
419                continue
420            try:
421                if not f(self.socket, self.configuration):
422                    log_automotive.info(
423                        "Cleanup function %s failed", repr(f))
424            except (OSError, ValueError, Scapy_Exception) as e:
425                log_automotive.critical("Exception during cleanup: %s", e)
426
427        self.cleanup_functions = list()
428
429    def show_testcases(self):
430        # type: () -> None
431        for t in self.configuration.test_cases:
432            t.show()
433
434    def show_testcases_status(self):
435        # type: () -> None
436        data = list()
437        for t in self.configuration.test_cases:
438            for s in self.state_graph.nodes:
439                data += [(repr(s), t.__class__.__name__, t.has_completed(s))]
440        make_lined_table(data, lambda *tup: (tup[0], tup[1], tup[2]))
441
442    def get_test_cases_by_class(self, cls):
443        # type: (Type[T]) -> List[T]
444        return [x for x in self.configuration.test_cases if isinstance(x, cls)]
445
446    @property
447    def supported_responses(self):
448        # type: () -> List[EcuResponse]
449        """
450        Returns a sorted list of supported responses, gathered from all
451        enumerators. The sort is done in a way
452        to provide the best possible results, if this list of supported
453        responses is used to simulate an real world Ecu with the
454        EcuAnsweringMachine object.
455        :return: A sorted list of EcuResponse objects
456        """
457        supported_responses = list()
458        for tc in self.configuration.test_cases:
459            supported_responses += tc.supported_responses
460
461        supported_responses.sort(key=Ecu.sort_key_func)
462        return supported_responses
463