• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for enumerators
2~ linux
3
4+ Load general modules
5
6= Load contribution layer
7
8from scapy.contrib.automotive.scanner.enumerator import _AutomotiveTestCaseScanResult, ServiceEnumerator, StateGenerator, StateGeneratingServiceEnumerator
9from scapy.contrib.automotive.scanner.test_case import TestCaseGenerator, AutomotiveTestCase
10from scapy.contrib.automotive.scanner.executor import AutomotiveTestCaseExecutor
11from scapy.contrib.isotp import ISOTP
12from scapy.contrib.automotive.uds import *
13from scapy.contrib.automotive.scanner.staged_test_case import StagedAutomotiveTestCase
14from scapy.utils import SingleConversationSocket
15from scapy.contrib.automotive.ecu import EcuState, EcuResponse
16from scapy.contrib.automotive.uds_ecu_states import *
17import copy
18
19+ Basic checks
20= ServiceEnumerator basecls checks
21
22pkts = [
23    _AutomotiveTestCaseScanResult(EcuState(session=1), UDS(b"\x20abcd"), UDS(b"\x60abcd"), 1.0, 1.9),
24    _AutomotiveTestCaseScanResult(EcuState(session=2), UDS(b"\x20abcd"), None, 2.0, None),
25    _AutomotiveTestCaseScanResult(EcuState(session=1), UDS(b"\x21abcd"), UDS(b"\x7fabcd"), 3.0, 3.1),
26    _AutomotiveTestCaseScanResult(EcuState(session=2), UDS(b"\x21abcd"), UDS(b"\x7fa\x10cd"), 4.0, 4.5),
27]
28
29class MyTestCase(ServiceEnumerator):
30    _supported_kwargs = copy.copy(ServiceEnumerator._supported_kwargs)
31    _supported_kwargs.update({
32        'local_kwarg': ((int, str), None),
33        'verbose': (bool, None),
34        'global_arg': (str, None)
35    })
36    def _get_initial_requests(self, **kwargs):
37        # type: (Any) -> Iterable[Packet]
38        return UDS(service=range(1, 11))
39    def _get_table_entry_y(self, tup):
40        # type: (_AutomotiveTestCaseScanResult) -> str
41        return "0x%02x: %s" % (tup[1].service, tup[1].sprintf("%UDS.service%"))
42    def _get_table_entry_z(self, tup):
43        # type: (_AutomotiveTestCaseScanResult) -> str
44        return self._get_label(tup[2], "PR: Supported")
45    @staticmethod
46    def _get_negative_response_label(response):
47        # type: (Packet) -> str
48        return response.sprintf("NR: %UDS_NR.negativeResponseCode%")
49    @staticmethod
50    def _get_negative_response_code(resp):
51        # type: (Packet) -> int
52        return resp.negativeResponseCode
53    @staticmethod
54    def _get_negative_response_desc(nrc):
55        # type: (int) -> str
56        return UDS_NR(negativeResponseCode=nrc).sprintf(
57            "%UDS_NR.negativeResponseCode%")
58
59
60e = MyTestCase()
61for p in pkts:
62    p.req.time = p.req_ts
63    p.req.sent_time = p.req_ts
64    if p.resp is not None:
65        p.resp.time = p.resp_ts
66    e._store_result(p.state, p.req, p.resp)
67
68
69= ServiceEnumerator not completed check
70
71assert e.completed == False
72
73= ServiceEnumerator completed
74
75e._state_completed[EcuState(session=1)] = True
76e._state_completed[EcuState(session=2)] = True
77
78assert e.completed
79
80= ServiceEnumerator stats check
81
82stat_list = e._compute_statistics()
83
84stats = {label: value for state, label, value in stat_list if state == "all"}
85print(stats)
86
87assert stats["num_answered"] == '3'
88assert stats["num_unanswered"] == '1'
89assert stats["answertime_max"] == '0.9'
90assert stats["answertime_min"] == '0.1'
91assert stats["answertime_avg"] == '0.5'
92assert stats["num_negative_resps"] == '2'
93
94= ServiceEnumerator scanned states
95
96assert len(e.scanned_states) == 2
97assert {EcuState(session=1), EcuState(session=2)} == e.scanned_states
98
99= ServiceEnumerator scanned results
100
101assert len(e.results_with_positive_response) == 1
102assert len(e.results_with_negative_response) == 2
103assert len(e.results_without_response) == 1
104assert len(e.results_with_response) == 3
105
106= ServiceEnumerator get_label
107assert e._get_label(pkts[0].resp) == "PR: PositiveResponse"
108assert e._get_label(pkts[0].resp, lambda _: "positive") == "positive"
109assert e._get_label(pkts[0].resp, lambda _: "positive" + hex(pkts[0].req.service)) == "positive" + "0x20"
110assert e._get_label(pkts[1].resp) == "Timeout"
111assert e._get_label(pkts[2].resp) == "NR: 98"
112assert e._get_label(pkts[3].resp) == "NR: generalReject"
113
114= ServiceEnumerator show
115
116e.show(filtered=False)
117
118dump = e.show(dump=True, filtered=False)
119assert "NR: 98" in dump
120assert "NR: generalReject" in dump
121assert "PR: Supported" in dump
122assert "Timeout" in dump
123assert "session1" in dump
124assert "session2" in dump
125assert "0x20" in dump
126assert "0x21" in dump
127
128= ServiceEnumerator filtered results before show
129
130print(len(e.filtered_results))
131assert len(e.filtered_results) == 2
132assert e.filtered_results[0] == pkts[0]
133assert e.filtered_results[1] == pkts[2]
134
135= ServiceEnumerator show filtered
136
137e.show(filtered=True)
138
139dump = e.show(dump=True, filtered=True)
140assert "NR: 98" in dump
141assert "NR: generalReject" in dump
142assert "PR: Supported" in dump
143assert "Timeout" not in dump
144assert "session1" in dump
145assert "session2" in dump
146assert "all" in dump
147assert "0x20" in dump
148assert "0x21" in dump
149assert "The following negative response codes are blacklisted: ['serviceNotSupported']" in dump
150
151= ServiceEnumerator filtered results after show
152
153assert len(e.filtered_results) == 3
154assert e.filtered_results[0] == pkts[0]
155assert e.filtered_results[1] == pkts[2]
156
157= ServiceEnumerator supported responses
158
159assert len(e.supported_responses) == 3
160
161= ServiceEnumerator evaluate response
162
163conf = {}
164
165assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), None, **conf)
166assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x10"), **conf)
167conf = {"exit_if_service_not_supported": True, "retry_if_busy_returncode": False}
168assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x21"), **conf)
169assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x10"), **conf)
170assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x11"), **conf)
171assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x7f"), **conf)
172conf = {"exit_if_service_not_supported": False, "retry_if_busy_returncode": True}
173assert not e._retry_pkt[EcuState(session=1)]
174assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x10"), **conf)
175assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x11"), **conf)
176assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x7f"), **conf)
177assert not e._retry_pkt[EcuState(session=1)]
178assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x21"), **conf)
179assert e._retry_pkt[EcuState(session=1)] == UDS(b"\x10\x03abcd")
180assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x7f\x10\x21"), **conf)
181assert not e._retry_pkt[EcuState(session=1)]
182
183assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), UDS(b"\x50\x03\x00"), **conf)
184assert False == e._evaluate_response(EcuState(session=1), UDS(b"\x11\x03abcd"), UDS(b"\x51\x03\x00"), **conf)
185conf = {"retry_if_none_received": True}
186assert True == e._evaluate_response(EcuState(session=1), UDS(b"\x10\x03abcd"), None, **conf)
187assert e._retry_pkt[EcuState(session=1)]
188
189
190= ServiceEnumerator execute
191
192from queue import Queue
193from scapy.supersocket import SuperSocket
194
195class MockISOTPSocket(SuperSocket):
196    nonblocking_socket = True
197    @property
198    def closed(self):
199        return False
200    @closed.setter
201    def closed(self, var):
202        pass
203    def __init__(self, rcvd_queue=None):
204        self.rcvd_queue = Queue()
205        self.sent_queue = Queue()
206        if rcvd_queue is not None:
207            for c in rcvd_queue:
208                self.rcvd_queue.put(c)
209    def recv_raw(self, x=MTU):
210        pkt = bytes(self.rcvd_queue.get(True, 0.01))
211        return UDS, pkt, 10.0
212    def send(self, x):
213        sx = raw(x)
214        try:
215            x.sent_time = 9.0
216        except AttributeError:
217            pass
218        self.sent_queue.put(sx)
219        return len(sx)
220    @staticmethod
221    def select(sockets, remain=None):
222        time.sleep(0)
223        return sockets
224    def sr(self, *args, **kargs):
225        from scapy import sendrecv
226        return sendrecv.sndrcv(self, *args, threaded=False, **kargs)
227    def sr1(self, *args, **kargs):
228        from scapy import sendrecv
229        ans = sendrecv.sndrcv(self, *args, threaded=False, **kargs)[0]  # type: SndRcvList
230        if len(ans) > 0:
231            pkt = ans[0][1]  # type: Packet
232            return pkt
233        else:
234            return None
235
236sock = MockISOTPSocket()
237sock.rcvd_queue.put(b"\x41")
238sock.rcvd_queue.put(b"\x42")
239sock.rcvd_queue.put(b"\x43")
240sock.rcvd_queue.put(b"\x44")
241sock.rcvd_queue.put(b"\x45")
242sock.rcvd_queue.put(b"\x46")
243sock.rcvd_queue.put(b"\x47")
244sock.rcvd_queue.put(b"\x48")
245sock.rcvd_queue.put(b"\x49")
246sock.rcvd_queue.put(b"\x4A")
247
248e = MyTestCase()
249
250e.execute(sock, EcuState(session=1))
251
252assert len(e.filtered_results) == 10
253assert len(e.results_with_response) == 10
254assert len(e.results_without_response) == 0
255
256assert e.has_completed(EcuState(session=1))
257assert e.completed
258
259e.execute(sock, EcuState(session=2), timeout=0.01)
260
261assert len(e.filtered_results) == 10
262assert len(e.results_with_response) == 10
263assert len(e.results_without_response) == 10
264
265assert e.has_completed(EcuState(session=2))
266
267e.execute(sock, EcuState(session=3), timeout=0.01, exit_if_no_answer_received=True)
268
269assert not e.has_completed(EcuState(session=3))
270assert not e.completed
271assert len(e.scanned_states) == 3
272
273e.execute(sock, EcuState(session=42), state_block_list=[EcuState(session=42)])
274
275assert e.has_completed(EcuState(session=42))
276assert len(e.scanned_states) == 3
277
278e.execute(sock, EcuState(session=13), state_block_list=EcuState(session=13))
279
280assert e.has_completed(EcuState(session=13))
281assert len(e.scanned_states) == 3
282
283e.execute(sock, EcuState(session=41), state_allow_list=[EcuState(session=42)])
284
285assert e.has_completed(EcuState(session=41))
286assert len(e.scanned_states) == 3
287
288e.execute(sock, EcuState(session=12), state_allow_list=EcuState(session=13))
289
290assert e.has_completed(EcuState(session=12))
291assert len(e.scanned_states) == 3
292
293= Test negative response code service not supported
294
295sock.rcvd_queue.put(b"\x7f\x01\x11")
296sock.rcvd_queue.put(b"\x7f\x01\x7f")
297
298e = MyTestCase()
299
300e.execute(sock, EcuState(session=1), exit_if_service_not_supported=True)
301
302assert not e._retry_pkt[EcuState(session=1)]
303assert len(e.results_with_response) == 1
304assert len(e.results_with_negative_response) == 1
305assert e.completed
306
307e.execute(sock, EcuState(session=2), exit_if_service_not_supported=True)
308
309assert not e._retry_pkt[EcuState(session=2)]
310assert len(e.results_with_response) == 2
311assert len(e.results_with_negative_response) == 2
312assert e.completed
313
314= Test negative response code retry if busy
315
316sock.rcvd_queue.put(b"\x7f\x01\x21")
317sock.rcvd_queue.put(b"\x7f\x01\x10")
318
319e = MyTestCase()
320
321e.execute(sock, EcuState(session=1))
322
323assert e._retry_pkt[EcuState(session=1)]
324assert len(e.results_with_response) == 1
325assert len(e.results_with_negative_response) == 1
326assert len(e.results_without_response) == 0
327assert not e.completed
328
329e.execute(sock, EcuState(session=1))
330
331assert not e._retry_pkt[EcuState(session=1)]
332assert len(e.results_with_response) == 2
333assert len(e.results_with_negative_response) == 2
334assert len(e.results_without_response) == 9
335assert e.completed
336assert e.has_completed(EcuState(session=1))
337
338= Test negative response code don't retry if busy
339
340sock.rcvd_queue.put(b"\x7f\x01\x21")
341
342e = MyTestCase()
343
344e.execute(sock, EcuState(session=1), retry_if_busy_returncode=False)
345
346assert not e._retry_pkt[EcuState(session=1)]
347assert len(e.results_with_response) == 1
348assert len(e.results_with_negative_response) == 1
349assert len(e.results_without_response) == 9
350assert e.completed
351assert e.has_completed(EcuState(session=1))
352
353= Test execution time
354
355sock.rcvd_queue.put(b"\x7f\x01\x10")
356
357e = MyTestCase()
358
359e.execute(sock, EcuState(session=1), execution_time=-1)
360
361assert not e._retry_pkt[EcuState(session=1)]
362assert len(e.results_with_response) == 1
363assert len(e.results_with_negative_response) == 1
364assert len(e.results_without_response) == 0
365assert not e.completed
366assert not e.has_completed(EcuState(session=1))
367
368
369+ AutomotiveTestCaseExecutorConfiguration tests
370
371= Definitions
372
373class MockSock(object):
374    closed = False
375    def sr1(self, *args, **kwargs):
376        raise OSError
377
378class TestCase1(MyTestCase):
379    pass
380
381class TestCase2(MyTestCase):
382    pass
383
384class Scanner(AutomotiveTestCaseExecutor):
385    @property
386    def default_test_case_clss(self):
387        # type: () -> List[Type[AutomotiveTestCaseABC]]
388        return [MyTestCase]
389
390= Basic tests
391
392tce = Scanner(MockSock(), test_cases=[TestCase1, TestCase2, MyTestCase],
393              verbose=True, debug=True,
394              global_arg="Whatever", TestCase1_kwargs={"local_kwarg": 42})
395
396config = tce.configuration  # type: AutomotiveTestCaseExecutorConfiguration
397assert config.verbose
398assert config.debug
399assert len(config.test_cases) == 3
400assert len(config.stages) == 0
401assert len(config.staged_test_cases) == 0
402assert len(config.test_case_clss) == 3
403assert len(config.TestCase1.items()) == 5
404assert len(config.TestCase2.items()) == 4
405assert len(config["TestCase1"].items()) == 5
406assert len(config.MyTestCase.items()) == 4
407assert config.TestCase1["verbose"]
408assert config.TestCase1["debug"]
409assert config.TestCase1["local_kwarg"] == 42
410assert config.TestCase1["global_arg"] == "Whatever"
411assert config.TestCase2["global_arg"] == "Whatever"
412assert config.MyTestCase["global_arg"] == "Whatever"
413assert isinstance(tce.socket, SingleConversationSocket)
414
415
416= Basic tests with default values
417
418tce = Scanner(MockSock())
419
420config = tce.configuration  # type: AutomotiveTestCaseExecutorConfiguration
421assert not config.verbose
422assert not config.debug
423assert len(config.test_cases) == 1
424assert len(config.MyTestCase.items()) == 1
425assert isinstance(tce.socket, SingleConversationSocket)
426
427
428= Basic test with stages
429
430def connector(testcase1, _):
431    scan_range = len(testcase1.results)
432    return {"verbose": True, "scan_range": scan_range}
433
434tc1 = TestCase1()
435tc2 = TestCase2()
436
437pipeline = StagedAutomotiveTestCase([tc1, tc2], [None, connector])
438
439tce = Scanner(MockSock(), test_cases=[pipeline])
440
441config = tce.configuration  # type: AutomotiveTestCaseExecutorConfiguration
442assert not config.verbose
443assert not config.debug
444assert len(config.test_cases) == 1
445assert len(config.stages) == 1
446assert len(config.staged_test_cases) == 2
447assert len(config.test_case_clss) == 3
448assert len(config.StagedAutomotiveTestCase.items()) == 1
449assert isinstance(tce.socket, SingleConversationSocket)
450
451= Basic tests with two stages
452
453def connector(testcase1, testcase2):
454    scan_range = len(testcase1.results)
455    return {"verbose": True, "scan_range": scan_range}
456
457tc1 = TestCase1()
458tc2 = TestCase2()
459
460pipeline = StagedAutomotiveTestCase([tc1, tc2], [None, connector])
461
462class StagedTest(StagedAutomotiveTestCase):
463    pass
464
465pipeline2 = StagedTest([MyTestCase(), MyTestCase()])
466
467tce = Scanner(MockSock(), test_cases=[pipeline, pipeline2], verbose=True)
468
469config = tce.configuration  # type: AutomotiveTestCaseExecutorConfiguration
470assert config.verbose
471assert not config.debug
472assert len(config.test_cases) == 2
473assert len(config.stages) == 2
474assert len(config.staged_test_cases) == 4
475assert len(config.test_case_clss) == 5
476assert len(config.StagedAutomotiveTestCase.items()) == 2
477assert len(config.StagedTest.items()) == 2
478assert len(config.TestCase1.items()) == 2
479assert len(config.TestCase2.items()) == 2
480assert len(config.MyTestCase.items()) == 2
481
482assert isinstance(tce.socket, SingleConversationSocket)
483
484assert len(tce.state_paths) == 1
485assert len(tce.final_states) == 1
486
487tce.state_graph.add_edge((tce.final_states[0], EcuState(session=2)))
488
489assert len(tce.state_paths) == 2
490assert len(tce.final_states) == 2
491
492assert not tce.scan_completed
493
494
495= Reset Handler tests
496
497reset_flag = False
498
499def reset_func():
500    global reset_flag
501    reset_flag = True
502
503tce = Scanner(MockSock(), reset_handler=reset_func)
504tce.target_state = EcuState(session=2)
505tce.reset_target()
506
507assert reset_flag
508assert tce.target_state == EcuState(session=1)
509
510= Reset Handler tests 2
511
512tce = Scanner(MockSock())
513tce.target_state = EcuState(session=2)
514tce.reset_target()
515
516assert tce.target_state == EcuState(session=1)
517
518= Reconnect Handler tests
519
520class MockSocket2:
521    closed = False
522
523def reconnect_func():
524    return MockSocket2()
525
526tce = Scanner(MockSock(), reconnect_handler=reconnect_func)
527
528print(tce.socket)
529print(repr(tce.socket))
530assert isinstance(tce.socket._inner, MockSock)
531tce.reconnect()
532assert isinstance(tce.socket._inner, MockSocket2)
533
534= Reconnect Handler tests 2
535
536closed = False
537
538class MockSocket1:
539    closed = False
540    def close(self):
541        global closed
542        closed = True
543
544class MockSocket2:
545    closed = False
546
547def reconnect_func():
548    return MockSocket2()
549
550tce = Scanner(MockSocket1(), reconnect_handler=reconnect_func)
551
552print(tce.socket)
553print(repr(tce.socket))
554assert isinstance(tce.socket._inner, MockSocket1)
555tce.reconnect()
556assert isinstance(tce.socket._inner, MockSocket2)
557assert closed
558
559= TestCase execute
560
561pre_exec = False
562execute = False
563post_exec = False
564
565class TestCase42(MyTestCase):
566    def pre_execute(self,
567                    socket,  # type: _SocketUnion
568                    state,  # type: EcuState
569                    global_configuration  # type: AutomotiveTestCaseExecutorConfiguration  # noqa: E501
570                    ):  # type: (...) -> None
571        global pre_exec
572        assert state == EcuState(session=1)
573        assert global_configuration.TestCase42["local_kwarg"] == 42
574        assert global_configuration.TestCase42["verbose"]
575        assert global_configuration.TestCase42["debug"]
576        global_configuration.TestCase42["local_kwarg"] = 1
577        pre_exec = True
578    def execute(self, socket, state, local_kwarg, verbose, debug, **kwargs):
579        global execute
580        assert verbose
581        assert debug
582        assert local_kwarg == 1
583        execute = True
584    def post_execute(self,
585                     socket,  # type: _SocketUnion
586                     state,  # type: EcuState
587                     global_configuration  # type: AutomotiveTestCaseExecutorConfiguration  # noqa: E501
588                     ):  # type: (...) -> None
589        global post_exec
590        assert global_configuration.TestCase42["local_kwarg"] == 1
591        assert global_configuration.TestCase42["verbose"]
592        assert global_configuration.TestCase42["debug"]
593        post_exec = True
594
595
596tce = Scanner(MockSock(), test_cases=[TestCase42],
597              verbose=True, debug=True,
598              TestCase42_kwargs={"local_kwarg": 42})
599
600tce.execute_test_case(TestCase42())
601assert pre_exec == execute == post_exec == True
602
603
604= TestCase execute StateGenerator
605
606transition_done = False
607
608def transition_func(sock, conf, kwargs):
609    assert kwargs["arg42"] == "hello"
610    assert conf.TestCase43["local_kwarg"] == "world"
611    global transition_done
612    transition_done = True
613    return True
614
615class TestCase43(MyTestCase, StateGenerator):
616    def get_new_edge(self, socket, config):
617        assert config.TestCase43["local_kwarg"] == "world"
618        return EcuState(session=1), EcuState(session=2)
619    def get_transition_function(self, socket, edge):
620        assert edge[0] == EcuState(session=1)
621        assert edge[1] == EcuState(session=2)
622        return transition_func, {"arg42": "hello"}, None
623    def execute(self, socket, state, **kwargs):
624        return True
625
626
627tce = Scanner(MockSock(), test_cases=[TestCase43],
628              TestCase43_kwargs={"local_kwarg": "world"})
629
630assert len(tce.final_states) == 1
631
632tce.execute_test_case(TestCase43())
633
634assert len(tce.final_states) == 2
635assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states
636assert tce.enter_state(EcuState(session=1), EcuState(session=2))
637assert transition_done
638
639
640= TestCase execute StateGenerator no edge
641
642class TestCase43(MyTestCase, StateGenerator):
643    def get_new_edge(self, socket, config):
644        assert config.TestCase43["local_kwarg"] == "world"
645        return None
646    def execute(self, socket, state, **kwargs):
647        return True
648    def get_transition_function(self, socket, edge):
649        raise NotImplementedError()
650
651tce = Scanner(MockSock(), test_cases=[TestCase43],
652              TestCase43_kwargs={"local_kwarg": "world"})
653
654assert len(tce.final_states) == 1
655
656tce.execute_test_case(TestCase43())
657
658assert len(tce.final_states) == 1
659assert EcuState(session=1) in tce.final_states
660assert not tce.enter_state(EcuState(session=1), EcuState(session=2))
661
662
663= TestCase execute StateGenerator with cleanupfunc
664
665transition_done = False
666cleanup_done = False
667
668def transition_func(sock, conf, kwargs):
669    assert kwargs["arg42"] == "hello"
670    assert conf.TestCase43["local_kwarg"] == "world"
671    global transition_done
672    transition_done = True
673    return True
674
675def cleanup_func(sock, conf):
676    assert conf.TestCase43["local_kwarg"] == "world"
677    global cleanup_done
678    cleanup_done = True
679    return True
680
681class TestCase43(MyTestCase, StateGenerator):
682    def get_new_edge(self, socket, config):
683        assert config.TestCase43["local_kwarg"] == "world"
684        return EcuState(session=1), EcuState(session=2)
685    def get_transition_function(self, socket, edge):
686        assert edge[0] == EcuState(session=1)
687        assert edge[1] == EcuState(session=2)
688        return transition_func, {"arg42": "hello"}, cleanup_func
689    def execute(self, socket, state, **kwargs):
690        return True
691
692
693tce = Scanner(MockSock(), test_cases=[TestCase43],
694              TestCase43_kwargs={"local_kwarg": "world"})
695
696assert len(tce.final_states) == 1
697
698tce.execute_test_case(TestCase43())
699
700assert len(tce.final_states) == 2
701assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states
702assert not len(tce.cleanup_functions)
703assert tce.enter_state(EcuState(session=1), EcuState(session=2))
704assert transition_done
705assert len(tce.cleanup_functions)
706tce.cleanup_state()
707assert not len(tce.cleanup_functions)
708assert cleanup_done
709
710
711= TestCase execute StateGenerator with not callable cleanupfunc
712
713transition_done = False
714
715def transition_func(sock, conf, kwargs):
716    assert kwargs["arg42"] == "hello"
717    assert conf.TestCase43["local_kwarg"] == "world"
718    global transition_done
719    transition_done = True
720    return True
721
722class TestCase43(MyTestCase, StateGenerator):
723    def get_new_edge(self, socket, config):
724        assert config.TestCase43["local_kwarg"] == "world"
725        return EcuState(session=1), EcuState(session=2)
726    def get_transition_function(self, socket, edge):
727        assert edge[0] == EcuState(session=1)
728        assert edge[1] == EcuState(session=2)
729        return transition_func, {"arg42": "hello"}, "fake"
730    def execute(self, socket, state, **kwargs):
731        return True
732
733
734tce = Scanner(MockSock(), test_cases=[TestCase43],
735              TestCase43_kwargs={"local_kwarg": "world"})
736
737assert len(tce.final_states) == 1
738
739tce.execute_test_case(TestCase43())
740
741assert len(tce.final_states) == 2
742assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states
743assert not len(tce.cleanup_functions)
744assert tce.enter_state(EcuState(session=1), EcuState(session=2))
745assert transition_done
746assert len(tce.cleanup_functions)
747tce.cleanup_state()
748assert not len(tce.cleanup_functions)
749
750= TestCase execute StateGenerator with cleanupfunc negative return
751
752transition_done = False
753cleanup_done = False
754
755def transition_func(sock, conf, kwargs):
756    assert kwargs["arg42"] == "hello"
757    assert conf.TestCase43["local_kwarg"] == "world"
758    global transition_done
759    transition_done = True
760    return True
761
762def cleanup_func(sock, conf):
763    assert conf.TestCase43["local_kwarg"] == "world"
764    global cleanup_done
765    cleanup_done = True
766    return False
767
768class TestCase43(MyTestCase, StateGenerator):
769    def get_new_edge(self, socket, config):
770        assert config.TestCase43["local_kwarg"] == "world"
771        return EcuState(session=1), EcuState(session=2)
772    def get_transition_function(self, socket, edge):
773        assert edge[0] == EcuState(session=1)
774        assert edge[1] == EcuState(session=2)
775        return transition_func, {"arg42": "hello"}, cleanup_func
776    def execute(self, socket, state, **kwargs):
777        return True
778
779
780tce = Scanner(MockSock(), test_cases=[TestCase43],
781              TestCase43_kwargs={"local_kwarg": "world"})
782
783assert len(tce.final_states) == 1
784
785tce.execute_test_case(TestCase43())
786
787assert len(tce.final_states) == 2
788assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states
789assert not len(tce.cleanup_functions)
790assert tce.enter_state(EcuState(session=1), EcuState(session=2))
791assert transition_done
792assert len(tce.cleanup_functions)
793tce.cleanup_state()
794assert not len(tce.cleanup_functions)
795assert cleanup_done
796
797
798= TestCase execute StateGenerator with cleanupfunc and path
799
800transition_done1 = False
801cleanup_done1 = False
802transition_done2 = False
803cleanup_done2 = False
804
805transition_error = False
806
807
808def transition_func1(sock, conf, kwargs):
809    global transition_done1
810    transition_done1 = True
811    return True
812
813def cleanup_func1(sock, conf):
814    global cleanup_done1
815    cleanup_done1 = True
816    return True
817
818def transition_func2(sock, conf, kwargs):
819    global transition_done2
820    transition_done2 = True
821    return not transition_error
822
823def cleanup_func2(sock, conf):
824    global cleanup_done2
825    cleanup_done2 = True
826    return True
827
828class TestCase43(MyTestCase, StateGenerator):
829    def get_new_edge(self, socket, config):
830        return EcuState(session=1), EcuState(session=2)
831    def get_transition_function(self, socket, edge):
832        return transition_func1, {"arg42": "hello"}, cleanup_func1
833    def execute(self, socket, state, **kwargs):
834        return True
835
836class TestCase44(MyTestCase, StateGenerator):
837    def get_new_edge(self, socket, config):
838        return EcuState(session=2), EcuState(session=3)
839    def get_transition_function(self, socket, edge):
840        return transition_func2, None, cleanup_func2
841    def execute(self, socket, state, **kwargs):
842        return True
843
844reset_done = False
845
846def reset_func():
847    global reset_done
848    reset_done = True
849
850reconnect_done = False
851
852def reconnect_func():
853    global reconnect_done
854    reconnect_done = True
855    return MockSock()
856
857tce = Scanner(MockSock(), test_cases=[TestCase43, TestCase44],
858              reset_handler=reset_func, reconnect_handler=reconnect_func)
859
860assert len(tce.final_states) == 1
861
862tce.execute_test_case(TestCase43())
863
864assert len(tce.final_states) == 2
865assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states
866tce.execute_test_case(TestCase44())
867assert len(tce.final_states) == 3
868assert EcuState(session=3) in tce.final_states and EcuState(session=2) in tce.final_states
869
870assert not len(tce.cleanup_functions)
871assert tce.enter_state_path([EcuState(session=1), EcuState(session=2), EcuState(session=3)])
872assert transition_done1
873assert transition_done2
874assert len(tce.cleanup_functions) == 2
875assert reconnect_done
876assert reset_done
877tce.cleanup_state()
878assert cleanup_done1
879assert cleanup_done2
880
881try:
882    tce.enter_state_path([EcuState(session=3)])
883    assert False
884except Scapy_Exception:
885    assert True
886
887= Test downrate edge
888
889transition_done1 = False
890cleanup_done1 = False
891
892tce = Scanner(MockSock(), test_cases=[TestCase43, TestCase44],
893              reset_handler=reset_func, reconnect_handler=reconnect_func)
894
895assert len(tce.final_states) == 1
896tce.execute_test_case(TestCase43())
897assert len(tce.final_states) == 2
898assert EcuState(session=1) in tce.final_states and EcuState(session=2) in tce.final_states
899tce.execute_test_case(TestCase44())
900assert len(tce.final_states) == 3
901assert EcuState(session=3) in tce.final_states and EcuState(session=2) in tce.final_states
902
903assert not len(tce.cleanup_functions)
904transition_error = True
905assert not tce.enter_state_path([EcuState(session=1), EcuState(session=2), EcuState(session=3)])
906assert transition_done1
907assert cleanup_done1
908assert len(tce.cleanup_functions) == 0
909assert tce.state_graph.weights[(EcuState(session=1), EcuState(session=2))] == 1
910assert tce.state_graph.weights[(EcuState(session=2), EcuState(session=3))] == 2
911
912
913= TestCase execute TestCaseGenerator
914
915tc_executed = False
916
917class GeneratedTestCase(MyTestCase):
918    def execute(self, socket, state, **kwargs):
919        assert kwargs["local_kwarg"] == "world"
920        global tc_executed
921        tc_executed = True
922        return True
923
924
925class TestCase43(MyTestCase, TestCaseGenerator):
926    def execute(self, socket, state, **kwargs):
927        return True
928    def get_generated_test_case(self):
929        return GeneratedTestCase()
930
931
932tce = Scanner(MockSock(), test_cases=[TestCase43],
933              GeneratedTestCase_kwargs={"local_kwarg": "world"})
934
935assert len(tce.final_states) == 1
936assert len(tce.configuration.test_cases) == 1
937
938tce.execute_test_case(tce.configuration.test_cases[0])
939
940assert len(tce.configuration.test_cases) == 2
941
942tce.execute_test_case(tce.configuration.test_cases[1])
943
944assert tc_executed
945
946= TestCase scan timeout
947
948tc_executed = False
949
950class GeneratedTestCase(MyTestCase):
951    def execute(self, socket, state, **kwargs):
952        assert kwargs["local_kwarg"] == "world"
953        global tc_executed
954        tc_executed = True
955        return True
956
957
958tce = Scanner(MockSock(), test_cases=[GeneratedTestCase],
959              GeneratedTestCase_kwargs={"local_kwarg": "world"})
960
961assert len(tce.final_states) == 1
962assert len(tce.configuration.test_cases) == 1
963
964tce.scan(-1)
965
966assert not tc_executed
967
968
969= TestCase scan
970
971tc_executed = False
972
973class GeneratedTestCase(MyTestCase):
974    def execute(self, socket, state, **kwargs):
975        assert kwargs["local_kwarg"] == "world"
976        global tc_executed
977        tc_executed = True
978        self._state_completed[state] = True
979        return True
980
981
982class TestCase43(MyTestCase, TestCaseGenerator):
983    def execute(self, socket, state, **kwargs):
984        self._state_completed[state] = True
985        return True
986    def get_generated_test_case(self):
987        return GeneratedTestCase()
988
989
990tce = Scanner(MockSock(), test_cases=[TestCase43],
991              GeneratedTestCase_kwargs={"local_kwarg": "world"})
992
993assert len(tce.final_states) == 1
994assert len(tce.configuration.test_cases) == 1
995
996tce.scan()
997
998assert len(tce.configuration.test_cases) == 2
999assert tc_executed
1000assert tce.scan_completed
1001
1002= Test supported responses
1003
1004class MyTestCase1(AutomotiveTestCase):
1005    _description = "MyTestCase1"
1006    _supported_kwargs = copy.copy(AutomotiveTestCase._supported_kwargs)
1007    _supported_kwargs.update({
1008        'stop_event': (threading.Event, None),  # type: ignore
1009    })
1010    @property
1011    def supported_responses(self):
1012        return [EcuResponse(EcuState(session=2),            responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"de")),
1013                EcuResponse([EcuState(session=2), EcuState(security_level=6)],            responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"dea2")),
1014                EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x13))]
1015
1016
1017class MyTestCase2(AutomotiveTestCase):
1018    _description = "MyTestCase2"
1019    _supported_kwargs = copy.copy(AutomotiveTestCase._supported_kwargs)
1020    _supported_kwargs.update({
1021        'stop_event': (threading.Event, None),  # type: ignore
1022    })
1023    @property
1024    def supported_responses(self):
1025        return [EcuResponse(EcuState(session=2),            responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef1")),
1026                EcuResponse(EcuState(session=2),            responses=UDS() / UDS_RDBIPR(dataIdentifier=6) / Raw(b"deadbeef2")),
1027                EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x10, requestServiceId=0x11))]
1028
1029
1030tce = Scanner(MockSock(), test_cases=[MyTestCase1(), MyTestCase2()])
1031
1032resps = tce.supported_responses
1033
1034assert len(resps) == 6
1035assert resps[0].responses[0].service != 0x7f
1036assert resps[1].responses[0].service != 0x7f
1037assert resps[2].responses[0].service != 0x7f
1038assert resps[3].responses[0].service != 0x7f
1039
1040assert resps[4].responses[0].service == 0x7f
1041assert resps[5].responses[0].service == 0x7f
1042
1043assert resps[0].responses[0].load == b"dea2"
1044assert resps[1].responses[0].load == b"deadbeef1"
1045assert resps[2].responses[0].load == b"deadbeef2"
1046assert resps[3].responses[0].load == b"de"
1047assert resps[4].responses[0].requestServiceId == 0x13
1048assert resps[5].responses[0].requestServiceId == 0x11
1049
1050= Test show testcases
1051
1052try:
1053    tce.show_testcases()
1054    assert True
1055except Exception:
1056    assert False
1057
1058
1059try:
1060    tce.show_testcases_status()
1061    assert True
1062except Exception:
1063    assert False
1064
1065= Test StateGeneratingServiceEnumerator
1066
1067class TestCase43(MyTestCase, StateGeneratingServiceEnumerator):
1068    def execute(self, socket, state, **kwargs):
1069        return True
1070    @property
1071    def results(self):  # type: () -> List[_AutomotiveTestCaseScanResult]
1072        return [_AutomotiveTestCaseScanResult(EcuState(session=1), UDS()/UDS_DSC(b"\x03"), UDS()/UDS_DSCPR(b"\x03"), 1.1, 1.2)]
1073
1074tce = Scanner(MockSock(), test_cases=[TestCase43])
1075
1076assert len(tce.final_states) == 1
1077
1078tce.execute_test_case(TestCase43())
1079
1080assert len(tce.final_states) == 2
1081assert EcuState(session=1) in tce.final_states and EcuState(session=3) in tce.final_states
1082
1083tf, args, cf = tce.state_graph.get_transition_tuple_for_edge((EcuState(session=1), EcuState(session=3)))
1084
1085assert cf is None
1086assert tf is not None
1087assert len(args) == 2
1088assert args["req"] == UDS()/UDS_DSC(b"\x03")
1089assert "diagnosticSessionType" in args["desc"] and "extendedDiagnosticSession" in args["desc"]
1090
1091assert not tce.enter_state(EcuState(session=1), EcuState(session=3))
1092