• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for the Ecu utility
2
3# More information at http://www.secdev.org/projects/UTscapy/
4
5############
6############
7
8+ Setup
9~ conf command
10
11= Load modules
12
13import copy
14import itertools
15
16load_contrib("isotp", globals_dict=globals())
17load_contrib("automotive.uds", globals_dict=globals())
18load_contrib("automotive.gm.gmlan", globals_dict=globals())
19load_layer("can", globals_dict=globals())
20conf.contribs["CAN"]["swap-bytes"] = True
21
22= Load Ecu module
23
24load_contrib("automotive.ecu", globals_dict=globals())
25from scapy.contrib.automotive.uds_ecu_states import *
26from scapy.contrib.automotive.uds_logging import *
27from scapy.contrib.automotive.gm.gmlan_ecu_states import *
28from scapy.contrib.automotive.gm.gmlan_logging import *
29
30+ EcuState Basic checks
31
32= Check EcuState basic functionality
33
34state = EcuState()
35state["session"] = 2
36state["securityAccess"] = 4
37print(repr(state))
38assert repr(state) == "securityAccess4session2"
39
40= More complex tests
41
42state = EcuState(ses=4)
43assert state.ses == 4
44state.ses = 5
45assert state.ses == 5
46
47= Even more complex tests
48
49state = EcuState(myinfo="42")
50
51state.ses = 5
52assert state.ses == 5
53
54state["ses"] = None
55assert state.ses is None
56
57state.ses = 5
58assert 5 == state.ses
59
60assert "42" == state.myinfo
61assert repr(state) == "myinfo42ses5"
62
63= Delete Attribute Test
64
65state = EcuState(myinfo="42")
66
67state.ses = 5
68assert state.ses == 5
69
70del state.ses
71
72try:
73    x = state.ses
74    assert False
75except (KeyError, AttributeError):
76    assert state.myinfo == "42"
77
78= Copy tests
79
80state = EcuState(myinfo="42")
81state.ses = 5
82
83ns = copy.copy(state)
84
85ns.ses = 6
86
87assert ns.ses == 6
88assert state.ses == 5
89assert ns.myinfo == "42"
90
91
92= Move tests
93
94state = EcuState(myinfo="42")
95state.ses = 5
96
97ns = state
98
99ns.ses = 6
100
101assert ns.ses == 6
102assert state.ses == 6
103assert ns.myinfo == "42"
104
105= equal tests
106
107state = EcuState(myinfo="42")
108state.ses = 5
109
110ns = copy.copy(state)
111
112assert state == ns
113assert hash(state) == hash(ns)
114
115ns.ses = 6
116
117assert state != ns
118assert hash(state) != hash(ns)
119
120ns.ses = 5
121
122assert state == ns
123assert hash(state) == hash(ns)
124
125ns.sa = 5
126
127assert state != ns
128assert hash(state) != hash(ns)
129
130
131= hash tests
132
133state = EcuState(myinfo="42")
134state.ses = 5
135
136ns = copy.copy(state)
137
138assert hash(state) == hash(ns)
139
140ns.ses = 6
141
142assert hash(state) != hash(ns)
143
144ns.ses = 5
145
146assert hash(state) == hash(ns)
147
148ns.sa = 5
149
150assert hash(state) != hash(ns)
151
152= command tests
153
154state = EcuState(myinfo="42")
155state.ses = 5
156
157state.command()
158assert "EcuState(myinfo='42', ses=5)" == state.command()
159
160= less than tests
161
162s1 = EcuState()
163s2 = EcuState()
164
165s1.a = 1
166s2.a = 2
167
168assert s1 < s2
169
170s1.b = 4
171
172assert s1 > s2
173
174s2.b = 1
175
176assert s1 < s2
177
178s1.a = 2
179
180assert s1 > s2
181
182= less than tests 2
183
184s1 = EcuState()
185s2 = EcuState()
186
187s1.c = "x"
188s2.c = 4
189exception = False
190
191try:
192    assert s1 < s2
193except TypeError:
194    exception = True
195
196assert exception
197
198= less than tests 3
199
200s1 = EcuState()
201s2 = EcuState()
202
203
204s1.A = 1
205s1.a = 2
206
207s2.A = 2
208s2.a = 1
209
210assert s1 < s2
211
212= less than tests 4
213
214s1 = EcuState()
215s2 = EcuState()
216
217
218s1.A = 1
219s1.a = 2
220
221s2.A = 2
222s2.b = 100
223
224assert s1 < s2
225
226= less than tests 5
227
228s1 = EcuState()
229s2 = EcuState()
230
231
232s1.A = 100
233s1.a = 2
234
235s2.A = 2
236s2.b = 100
237
238assert s1 > s2
239assert not s1 > s1
240assert not s1 < s1
241
242= less than tests 6
243
244s1 = EcuState()
245s2 = EcuState()
246
247
248s1.A = 100
249s1.B = 200
250
251s2.a = 2
252s2.b = 1
253
254assert s1 < s2
255
256= contains test
257
258s1 = EcuState(ses=[1,2,3])
259s2 = EcuState(ses=1)
260
261assert s1 != s2
262assert s2 in s1
263assert s1 not in s2
264
265s1 = EcuState(ses=[2,3])
266s2 = EcuState(ses=1)
267
268assert s1 != s2
269assert s2 not in s1
270assert s1 not in s2
271
272
273s1 = EcuState(ses=[1,2,3], security=5)
274s2 = EcuState(ses=1)
275
276assert s1 != s2
277assert s2 not in s1
278assert s1 not in s2
279
280s1 = EcuState(ses=range(4), security=[None, 5])
281s2 = EcuState(ses=1)
282
283assert s1 != s2
284assert s2 in s1
285assert s1 not in s2
286
287s1 = EcuState(ses=range(4), security=[None, 5])
288s2 = EcuState(ses=range(2))
289
290assert s1 != s2
291assert s2 < s1
292assert s2 in s1
293assert s1 not in s2
294
295s1 = EcuState(ses=range(4), security=[None, 5])
296s2 = EcuState(ses=range(2), security=5)
297
298assert s1 != s2
299assert s2 in s1
300assert s1 not in s2
301
302s1 = EcuState(ses=range(4), security=[None, 5])
303s2 = EcuState(ses=range(5))
304
305assert s1 != s2
306assert s2 not in s1
307assert s1 not in s2
308
309s1 = EcuState(ses=range(4), security=[None, range(5)])
310s2 = EcuState(ses=3)
311
312print(s1._expand())
313print(s2._expand())
314
315assert s1 != s2
316assert s2 in s1
317assert s1 not in s2
318
319s1 = EcuState(ses=range(4), security=[None, range(5), [5, 7, range(4), [range(10), 10]]])
320s2 = EcuState(ses=3, security=10)
321
322print(s1._expand())
323print(s2._expand())
324
325assert s1 != s2
326assert s2 in s1
327assert s1 not in s2
328
329s1 = EcuState(ses=range(4), security=[None, range(5), [5, 7, range(4), [range(10), "B"]]])
330s2 = EcuState(ses=3, security="B")
331
332print(s1._expand())
333print(s2._expand())
334
335assert s1 != s2
336assert s2 in s1
337assert s1 not in s2
338
339s1 = EcuState(ses=range(4), security=[None, range(5), [5, 7, range(4), [range(10), "B"]]])
340s2 = EcuState(ses=3, security="C")
341
342print(s1._expand())
343print(s2._expand())
344
345assert s1 != s2
346assert s2 not in s1
347assert s1 not in s2
348
349s1 = EcuState(ses=range(3), security=5)
350s2 = EcuState(ses=1, security=5)
351
352assert s1 != s2
353assert s2 in s1
354assert s1 not in s2
355
356s1 = EcuState(ses=range(3), security=(x for x in range(1, 10, 2)))
357s2 = EcuState(ses=1, security=5)
358
359assert s1 != s2
360assert s2 in s1
361assert s1 not in s2
362
363s1 = EcuState(ses=[1,2,3])
364s2 = EcuState(ses=[1,2,3])
365
366assert s1 in s2
367assert s2 in s1
368assert s1 == s2
369
370s1 = EcuState(ses=1)
371s2 = EcuState(ses=1)
372
373assert s1 in s2
374assert s2 in s1
375assert s1 == s2
376
377s1 = EcuState(ses=range(3), security=range(5))
378for ses, sec in itertools.product(range(3), range(5)):
379    s2 = EcuState(ses=ses, security=sec)
380    assert s1 != s2
381    assert s2 in s1
382    assert s1 not in s2
383
384
385s1 = EcuState(ses=[0, 1, 2], security=[43, 44])
386for ses, sec in itertools.product(range(3), range(43, 45)):
387    s2 = EcuState(ses=ses, security=sec)
388    assert s1 != s2
389    assert s2 in s1
390    assert s1 not in s2
391
392s1 = EcuState(ses=[0, 1, 2], security=["a", "b"])
393for ses, sec in itertools.product(range(3), (x for x in "ab")):
394    s2 = EcuState(ses=ses, security=sec)
395    assert s1 != s2
396    assert s2 in s1
397    try:
398        assert s1 not in s2
399    except TypeError:
400        assert True
401
402s1 = [EcuState(ses=1), EcuState(ses=2), EcuState(ses=3)]
403s2 = EcuState(ses=3)
404
405assert s2 in s1
406assert s1 not in s2
407
408
409s1 = EcuState(ses=1, sa="SEC")
410s2 = EcuState(ses=1, sa="SOC")
411
412assert s1 not in s2
413assert s2 not in s1
414assert s1 != s2
415
416s1 = EcuState(ses=1, sa="SEC")
417s2 = EcuState(ses=1, sa="SEC")
418
419assert s1 in s2
420assert s2 in s1
421assert s1 == s2
422
423
424s1 = EcuState(ses=1, sa="SEC")
425s2 = EcuState(ses=1, sa=["SEC", "SOL"])
426
427
428assert s1 in s2
429assert s2 not in s1
430assert s1 != s2
431
432
433s1 = EcuState(ses=1, sa=b"SEC")
434s2 = EcuState(ses=1, sa=[b"SEC", "SOL"])
435
436assert s1 in s2
437assert s2 not in s1
438assert s1 != s2
439
440
441+ EcuState modification tests
442
443= Basic definitions for tests
444
445class myPack1(Packet):
446    fields_desc = [
447        IntField("fakefield", 1)
448    ]
449
450class myPack2(Packet):
451    fields_desc = [
452        IntField("statefield", 1)
453    ]
454
455@EcuState.extend_pkt_with_modifier(myPack2)
456def modify_ecu_state(self, req, ecustate):
457    # type: (Packet, Packet, EcuState) -> None
458    ecustate.state = self.statefield
459
460pkt = myPack1()/myPack2()
461st = EcuState()
462exception = False
463
464try:
465    assert st.state == 1
466except AttributeError:
467    exception = True
468
469assert exception == True
470assert EcuState.is_modifier_pkt(pkt)
471assert not EcuState.is_modifier_pkt(myPack1())
472
473mod = EcuState.get_modified_ecu_state(pkt, Raw(), st)
474assert mod != st
475assert mod.state ==1
476
477pkt2 = myPack1()/myPack1()/myPack1()/myPack2(statefield=5)
478mod2 = EcuState.get_modified_ecu_state(pkt2, Raw(), mod)
479
480assert mod != mod2
481assert mod < mod2
482
483pkt2 = myPack1()/myPack1()/myPack1()/myPack2(statefield=4)/myPack2(statefield=5)
484mod2 = EcuState.get_modified_ecu_state(pkt2, Raw(), mod)
485mod.state = 5
486assert mod != mod2
487assert mod > mod2
488
489+ EcuResponse tests
490
491= Basic checks
492
493resp = EcuResponse(EcuState(session=1), UDS()/UDS_DSCPR(b"\x03"))
494
495assert not resp.supports_state(EcuState())
496assert not resp.supports_state(EcuState(session=2))
497assert resp.supports_state(EcuState(session=1))
498assert resp.answers(UDS()/UDS_DSC(b"\x03"))
499
500= Command checks
501
502resp = EcuResponse(EcuState(session=1), UDS()/UDS_DSCPR(b"\x03"))
503cmd = resp.command()
504
505print(cmd)
506resp1 = eval(cmd)
507assert resp1 == resp
508
509= Command checks 2
510
511p1 = UDS(bytes(UDS()/UDS_NR(b"\x10\x00")))
512p2 = UDS(bytes(UDS()/UDS_DSCPR(b"\x03")))
513
514resp = EcuResponse([EcuState(session=1), EcuState(session=3)], [p1, p2])
515cmd = resp.command()
516
517print(cmd)
518resp1 = eval(cmd)
519assert any(resp1.supports_state(s) for s in resp.states)
520assert any(resp.supports_state(s) for s in resp1.states)
521assert len(resp.responses) == len(resp1.responses)
522assert all(bytes(x) == bytes(y) for x, y in zip(resp.responses, resp1.responses))
523assert resp1 == resp
524
525= Compare check
526
527p1 = UDS(bytes(UDS()/UDS_NR(b"\x10\x00")))
528p2 = UDS(bytes(UDS()/UDS_DSCPR(b"\x03")))
529
530resp = EcuResponse([EcuState(session=1), EcuState(session=3)], [p1, p2])
531
532resp1 = EcuResponse([EcuState(session=1)], [p1, p2])
533
534resp2 = EcuResponse([EcuState(session=2)], [p1, p2])
535resp3 = EcuResponse([EcuState(session=1)], [p2])
536
537
538assert resp == resp1
539assert resp != resp2
540assert resp != resp3
541
542= Key response check
543
544req = UDS()/UDS_DSC(b"\x03")
545p1 = UDS(bytes(UDS()/UDS_NR(b"\x10\x00")))
546p2 = UDS(bytes(UDS()/UDS_DSCPR(b"\x03")))
547
548resp = EcuResponse([EcuState(session=1), EcuState(session=3)], [p1, p2])
549
550assert resp.answers(req)
551assert resp.key_response.answers(req)
552
553
554
555+ Ecu Simple operations
556
557= Log all commands applied to an Ecu
558
559msgs = [UDS(service=16) / UDS_DSC(diagnosticSessionType=3),
560        UDS(service=16) / UDS_DSC(diagnosticSessionType=4),
561        UDS(service=16) / UDS_DSC(diagnosticSessionType=5),
562        UDS(service=16) / UDS_DSC(diagnosticSessionType=6),
563        UDS(service=16) / UDS_DSC(diagnosticSessionType=2)]
564
565ecu = Ecu(verbose=False, store_supported_responses=False)
566ecu.update(PacketList(msgs))
567assert len(ecu.log["DiagnosticSessionControl"]) == 5
568timestamp, value = ecu.log["DiagnosticSessionControl"][0]
569assert timestamp > 0
570assert value == "extendedDiagnosticSession"
571assert ecu.log["DiagnosticSessionControl"][-1][1] == "programmingSession"
572
573
574= Trace all commands applied to an Ecu
575
576msgs = [UDS(service=16) / UDS_DSC(diagnosticSessionType=3),
577        UDS(service=80) / UDS_DSCPR(diagnosticSessionType=3, sessionParameterRecord=b'\\x002\\x01\\xf4')]
578
579ecu = Ecu(verbose=True, logging=False, store_supported_responses=False)
580ecu.update(PacketList(msgs))
581assert ecu.state.session == 3
582
583
584= Generate supported responses of an Ecu
585
586msgs = [UDS(service=16) / UDS_DSC(diagnosticSessionType=3),
587        UDS(service=80) / UDS_DSCPR(diagnosticSessionType=3, sessionParameterRecord=b'\\x002\\x01\\xf4'),
588        UDS(service=16) / UDS_DSC(diagnosticSessionType=4)]
589
590ecu = Ecu(verbose=False, logging=False, store_supported_responses=True)
591ecu.update(PacketList(msgs))
592supported_responses = ecu.supported_responses
593unanswered_packets = ecu.unanswered_packets
594assert ecu.state.session == 3
595assert len(supported_responses) == 1
596assert len(unanswered_packets) == 1
597
598response = supported_responses[0]
599print(response.command())
600assert response.supports_state(EcuState())
601assert response.key_response.service == 80
602assert unanswered_packets[0].diagnosticSessionType == 4
603
604
605+ Ecu Advanced checks
606
607= Analyze multiple UDS messages
608
609udsmsgs = sniff(offline=scapy_path("test/pcaps/ecu_trace.pcap.gz"),
610                session=ISOTPSession(use_ext_address=False, basecls=UDS),
611                count=50, timeout=3)
612
613assert len(udsmsgs) == 50
614
615ecu = Ecu()
616ecu.update(udsmsgs)
617response = ecu.supported_responses[0]
618assert response.supports_state(EcuState())
619assert response.key_response.service == 80
620assert response.key_response.diagnosticSessionType == 3
621response = ecu.supported_responses[1]
622assert response.supports_state(EcuState(session=3))
623assert response.key_response.service == 80
624assert response.key_response.diagnosticSessionType == 2
625response = ecu.supported_responses[4]
626print(response)
627state = EcuState(session=2, security_level=18)
628print(state)
629assert response.supports_state(state)
630assert response.key_response.service == 110
631assert response.key_response.dataIdentifier == 61786
632assert len(ecu.log["TransferData"]) == 2
633
634+ EcuSession tests
635
636= Analyze on the fly with EcuSession
637
638session = EcuSession()
639
640with PcapReader(scapy_path("test/pcaps/ecu_trace.pcap.gz")) as sock:
641     udsmsgs = sniff(session=ISOTPSession(supersession=session, use_ext_address=False, basecls=UDS), count=50, opened_socket=sock, timeout=3)
642
643assert len(udsmsgs) == 50
644
645ecu = session.ecu
646response = ecu.supported_responses[0]
647assert response.supports_state(EcuState())
648assert response.key_response.service == 80
649assert response.key_response.diagnosticSessionType == 3
650response = ecu.supported_responses[1]
651assert response.supports_state(EcuState(session=3))
652assert response.key_response.service == 80
653assert response.key_response.diagnosticSessionType == 2
654response = ecu.supported_responses[4]
655print(response)
656state = EcuState(session=2, security_level=18)
657print(state)
658assert response.supports_state(state)
659assert response.key_response.service == 110
660assert response.key_response.dataIdentifier == 61786
661assert len(ecu.log["TransferData"]) == 2
662
663
664= Analyze on the fly with EcuSession GMLAN1
665
666session = EcuSession()
667
668conf.contribs['CAN']['swap-bytes'] = True
669
670with PcapReader(scapy_path("test/pcaps/gmlan_trace.pcap.gz")) as sock:
671    gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=2, opened_socket=sock, timeout=3)
672    ecu = session.ecu
673    print("Check 1 after change to diagnostic mode")
674    assert len(ecu.supported_responses) == 1
675    assert ecu.state == EcuState(session=3)
676    gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=6, opened_socket=sock)
677    ecu = session.ecu
678    print("Check 2 after some more messages were read1")
679    assert len(ecu.supported_responses) == 3
680    print("Check 2 after some more messages were read2")
681    assert ecu.state.session == 3
682    print("assert 1")
683    assert ecu.state.communication_control == 1
684    gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=2, opened_socket=sock)
685    ecu = session.ecu
686    print("Check 3 after change to programming mode (bootloader)")
687    assert len(ecu.supported_responses) == 4
688    assert ecu.state.session == 2
689    assert ecu.state.communication_control == 1
690    gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=6, opened_socket=sock)
691    ecu = session.ecu
692    print("Check 4 after gaining security access")
693    assert len(ecu.supported_responses) == 6
694    assert ecu.state.session == 2
695    assert ecu.state.security_level == 2
696    assert ecu.state.communication_control == 1
697
698= Analyze on the fly with EcuSession GMLAN logging test
699
700session = EcuSession(verbose=False, store_supported_responses=False)
701
702conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4
703conf.contribs['CAN']['swap-bytes'] = True
704
705conf.debug_dissector = True
706gmlanmsgs = sniff(offline=scapy_path("test/pcaps/gmlan_trace.pcap.gz"),
707                  session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN),
708                  count=200, timeout=6)
709
710ecu = session.ecu
711assert len(ecu.supported_responses) == 0
712
713assert len([m for m in gmlanmsgs if m.sprintf("%GMLAN.service%") == "TransferData"]) == len(ecu.log["TransferData"])
714assert len([m for m in gmlanmsgs if m.sprintf("%GMLAN.service%") == "RequestDownload"]) == len(ecu.log["RequestDownload"])
715assert len([m for m in gmlanmsgs if m.sprintf("%GMLAN.service%") == "ReadDataByIdentifier"]) == len(ecu.log["ReadDataByIdentifier"])
716
717assert len(ecu.log["SecurityAccess"]) == 2
718assert len(ecu.log["SecurityAccessPositiveResponse"]) == 2
719
720assert ecu.log["TransferData"][-1][1][0] == "downloadAndExecuteOrExecute"
721