• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for gmlanutil
2~ scanner
3
4+ Configuration
5~ conf
6
7
8= Imports
9
10from scapy.contrib.automotive import log_automotive
11from test.testsocket import TestSocket, cleanup_testsockets
12import logging
13
14############
15############
16+ Load general modules
17
18= Load contribution layer
19
20load_layer("can", globals_dict=globals())
21load_contrib("automotive.gm.gmlan", globals_dict=globals())
22load_contrib("automotive.gm.gmlanutils", globals_dict=globals())
23
24log_automotive.setLevel(logging.DEBUG)
25
26= Define test sockets
27
28isotpsock2 = TestSocket(GMLAN)
29isotpsock = TestSocket(GMLAN)
30isotpsock2.pair(isotpsock)
31
32##############################################################################
33+ GMLAN_RequestDownload Tests
34##############################################################################
35= Positive, immediate positive response
36
37ecusimSuccessfullyExecuted = False
38started = threading.Event()
39
40def ecusim():
41    global ecusimSuccessfullyExecuted
42    ecusimSuccessfullyExecuted= True
43    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
44    pkt = GMLAN()/GMLAN_RD(memorySize=4)
45    if bytes(requ[0]) != bytes(pkt):
46        ecusimSuccessfullyExecuted = False
47    else:
48        ecusimSuccessfullyExecuted = True
49    ack = b"\x74"
50    isotpsock2.send(ack)
51
52thread = threading.Thread(target=ecusim)
53sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
54thread.start()
55started.wait(timeout=5)
56res = GMLAN_RequestDownload(isotpsock, 4, timeout=1) == True
57thread.join(timeout=5)
58
59assert res
60assert ecusimSuccessfullyExecuted == True
61
62= Negative, immediate negative response
63
64started = threading.Event()
65def ecusim():
66    isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
67    nr = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x22)
68    isotpsock2.send(nr)
69
70thread = threading.Thread(target=ecusim)
71sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
72thread.start()
73started.wait(timeout=5)
74res = GMLAN_RequestDownload(isotpsock, 4, timeout=1) == False
75thread.join(timeout=5)
76assert res
77
78= Negative, timeout
79assert GMLAN_RequestDownload(isotpsock, 4, timeout=0.01) == False
80
81############################ Response pending
82= Positive, after response pending
83started = threading.Event()
84
85def ecusim():
86    isotpsock2.sniff(count=1, timeout=2, started_callback=started.set)
87    pending = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78)
88    isotpsock2.send(pending)
89    ack = b"\x74"
90    isotpsock2.send(ack)
91
92thread = threading.Thread(target=ecusim)
93sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
94thread.start()
95started.wait(timeout=5)
96res = GMLAN_RequestDownload(isotpsock, 4, timeout=2) == True
97thread.join(timeout=5)
98assert res
99
100= Positive, hold response pending for several messages
101tout = 0.1
102repeats = 4
103started = threading.Event()
104
105def ecusim():
106    isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
107    ack = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78)
108    for i in range(repeats):
109        isotpsock2.send(ack)
110        time.sleep(tout)
111    ack = b"\x74"
112    isotpsock2.send(ack)
113
114thread = threading.Thread(target=ecusim)
115sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
116thread.start()
117started.wait(timeout=5)
118starttime = time.time() # may be inaccurate -> on some systems only seconds precision
119result = GMLAN_RequestDownload(isotpsock, 4, timeout=repeats*tout+0.5)
120endtime = time.time() + 1
121thread.join(timeout=5)
122assert result
123print(endtime - starttime)
124print(tout * (repeats - 1))
125assert (endtime - starttime) >= tout * (repeats - 1)
126
127
128= Negative, negative response after response pending
129started = threading.Event()
130def ecusim():
131    isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
132    pending = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78)
133    isotpsock2.send(pending)
134    nr = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x22)
135    isotpsock2.send(nr)
136
137thread = threading.Thread(target=ecusim)
138sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
139thread.start()
140started.wait(timeout=5)
141res = GMLAN_RequestDownload(isotpsock, 4, timeout=0.1) == False
142thread.join(timeout=5)
143assert res
144
145
146= Negative, timeout after response pending
147started = threading.Event()
148def ecusim():
149    isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
150    pending = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78)
151    isotpsock2.send(pending)
152
153thread = threading.Thread(target=ecusim)
154sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
155thread.start()
156started.wait(timeout=5)
157res = GMLAN_RequestDownload(isotpsock, 4, timeout=0.1) == False
158thread.join(timeout=5)
159assert res
160
161
162
163= Positive, pending message from different service interferes while pending
164started = threading.Event()
165def ecusim():
166    isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
167    pending = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78)
168    isotpsock2.send(pending)
169    wrongservice = GMLAN()/GMLAN_NR(requestServiceId=0x36, returnCode=0x78)
170    isotpsock2.send(wrongservice)
171    isotpsock2.send(pending)
172    ack = b"\x74"
173    isotpsock2.send(ack)
174
175thread = threading.Thread(target=ecusim)
176sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
177thread.start()
178started.wait(timeout=5)
179res = GMLAN_RequestDownload(isotpsock, 4, timeout=0.1) == True
180thread.join(timeout=5)
181assert res
182
183= Positive, negative response from different service interferes while pending
184started = threading.Event()
185
186def ecusim():
187    isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
188    pending = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78)
189    isotpsock2.send(pending)
190    wrongservice = GMLAN()/GMLAN_NR(requestServiceId=0x36, returnCode=0x22)
191    isotpsock2.send(wrongservice)
192    isotpsock2.send(pending)
193    ack = b"\x74"
194    isotpsock2.send(ack)
195
196thread = threading.Thread(target=ecusim)
197sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
198thread.start()
199started.wait(timeout=5)
200res = GMLAN_RequestDownload(isotpsock, 4, timeout=0.1) == True
201thread.join(timeout=5)
202assert res
203
204################### RETRY
205= Positive, first: immediate negative response, retry: Positive
206started = threading.Event()
207def ecusim():
208    global ecusimSuccessfullyExecuted
209    ecusimSuccessfullyExecuted= True
210    # negative
211    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
212    pkt = GMLAN()/GMLAN_RD(memorySize=4)
213    if bytes(requ[0]) != bytes(pkt):
214        ecusimSuccessfullyExecuted = False
215    nr = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x22)
216    # positive retry
217    print("retry")
218    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(nr))
219    pkt = GMLAN()/GMLAN_RD(memorySize=4)
220    print(requ)
221    if bytes(requ[0]) != bytes(pkt):
222        ecusimSuccessfullyExecuted = False
223    ack = b"\x74"
224    isotpsock2.send(ack)
225
226thread = threading.Thread(target=ecusim)
227sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
228thread.start()
229started.wait(timeout=5)
230res = GMLAN_RequestDownload(isotpsock, 4, timeout=0.1, retry=1) == True
231thread.join(timeout=5)
232assert res
233assert ecusimSuccessfullyExecuted == True
234
235
236
237##############################################################################
238+ GMLAN_TransferData Tests
239##############################################################################
240= Positive, short payload, scheme = 4
241conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4
242payload = b"\x00\x11\x22\x33\x44\x55\x66\x77"
243ecusimSuccessfullyExecuted = True
244started = threading.Event()
245def ecusim():
246    global ecusimSuccessfullyExecuted
247    ecusimSuccessfullyExecuted= True
248    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
249    pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000000,
250                             dataRecord=payload)
251    if bytes(requ[0]) != bytes(pkt):
252        ecusimSuccessfullyExecuted = False
253    ack = b"\x76"
254    isotpsock2.send(ack)
255
256thread = threading.Thread(target=ecusim)
257sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
258thread.start()
259started.wait(timeout=5)
260res = GMLAN_TransferData(isotpsock, 0x40000000, payload, timeout=1) == True
261thread.join(timeout=5)
262assert res
263assert ecusimSuccessfullyExecuted == True
264
265= Positive, short payload, scheme = 3
266conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 3
267payload = b"\x00\x11\x22\x33\x44\x55\x66\x77"
268ecusimSuccessfullyExecuted = True
269started = threading.Event()
270def ecusim():
271    global ecusimSuccessfullyExecuted
272    ecusimSuccessfullyExecuted= True
273    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
274    pkt = GMLAN() / GMLAN_TD(startingAddress=0x400000,
275                             dataRecord=payload)
276    if bytes(requ[0]) != bytes(pkt):
277        ecusimSuccessfullyExecuted = False
278    ack = b"\x76"
279    isotpsock2.send(ack)
280
281thread = threading.Thread(target=ecusim)
282sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
283thread.start()
284started.wait(timeout=5)
285res = GMLAN_TransferData(isotpsock, 0x400000, payload, timeout=1) == True
286thread.join(timeout=5)
287assert res
288assert ecusimSuccessfullyExecuted == True
289
290= Positive, short payload, scheme = 2
291conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 2
292payload = b"\x00\x11\x22\x33\x44\x55\x66\x77"
293ecusimSuccessfullyExecuted = True
294started = threading.Event()
295def ecusim():
296    global ecusimSuccessfullyExecuted
297    ecusimSuccessfullyExecuted = True
298    requ = isotpsock2.sniff(count=1, timeout=2, started_callback=started.set)
299    pkt = GMLAN() / GMLAN_TD(startingAddress=0x4000, dataRecord=payload)
300    if bytes(requ[0]) != bytes(pkt):
301        ecusimSuccessfullyExecuted = False
302    ack = b"\x76"
303    isotpsock2.send(ack)
304
305thread = threading.Thread(target=ecusim)
306sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
307thread.start()
308started.wait(timeout=5)
309res = GMLAN_TransferData(isotpsock, 0x4000, payload, timeout=2) == True
310thread.join(timeout=5)
311assert res
312assert ecusimSuccessfullyExecuted == True
313
314= Negative, short payload
315conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4
316payload = b"\x00\x11\x22\x33\x44\x55\x66\x77"
317ecusimSuccessfullyExecuted = True
318started = threading.Event()
319def ecusim():
320    global ecusimSuccessfullyExecuted
321    ecusimSuccessfullyExecuted= True
322    isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
323    nr = GMLAN() / GMLAN_NR(requestServiceId=0x36, returnCode=0x22)
324    isotpsock2.send(nr)
325
326thread = threading.Thread(target=ecusim)
327sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
328thread.start()
329started.wait(timeout=5)
330res = GMLAN_TransferData(isotpsock, 0x40000000, payload, timeout=1) == False
331thread.join(timeout=5)
332assert res
333
334= Negative, timeout
335
336assert GMLAN_TransferData(isotpsock, 0x4000, payload, timeout=0.1) == False
337
338
339= Positive, long payload
340conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4
341payload = b"\x00\x11\x22\x33\x44\x55\x66\x77"
342ecusimSuccessfullyExecuted = True
343started = threading.Event()
344def ecusim():
345    global ecusimSuccessfullyExecuted
346    ecusimSuccessfullyExecuted= True
347    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
348    pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000000,
349                             dataRecord=payload*2)
350    if bytes(requ[0]) != bytes(pkt):
351        ecusimSuccessfullyExecuted = False
352    ack = b"\x76"
353    # second package with inscreased address
354    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
355    pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000010,
356                             dataRecord=payload * 2)
357    if bytes(requ[0]) != bytes(pkt):
358        ecusimSuccessfullyExecuted = False
359    ack = b"\x76"
360    isotpsock2.send(ack)
361
362thread = threading.Thread(target=ecusim)
363sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
364thread.start()
365started.wait(timeout=5)
366res = GMLAN_TransferData(isotpsock, 0x40000000, payload*4, maxmsglen=16, timeout=1) == True
367thread.join(timeout=5)
368assert res
369assert ecusimSuccessfullyExecuted == True
370
371
372#
373= Positive, first part of payload succeeds, second pending, then fails, retry succeeds
374conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4
375payload = b"\x00\x11\x22\x33\x44\x55\x66\x77"
376started = threading.Event()
377def ecusim():
378    isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
379    ack = b"\x76"
380    # second package with inscreased address
381    isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
382    pending = GMLAN() / GMLAN_NR(requestServiceId=0x36, returnCode=0x78)
383    isotpsock2.send(pending)
384    nr = GMLAN() / GMLAN_NR(requestServiceId=0x36, returnCode=0x22)
385    isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(nr))
386    ack = b"\x76"
387    isotpsock2.send(ack)
388
389thread = threading.Thread(target=ecusim)
390sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
391thread.start()
392started.wait(timeout=5)
393res = GMLAN_TransferData(isotpsock, 0x40000000, payload*4, maxmsglen=16, timeout=0.1, retry=1) == True
394thread.join(timeout=5)
395assert res
396
397############
398= Positive, maxmsglen length check -> message is split automatically
399
400conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4
401payload = b"\x00\x11\x22\x33\x44\x55\x66\x77"
402ecusimSuccessfullyExecuted = True
403sim_started = threading.Event()
404def ecusim():
405    global ecusimSuccessfullyExecuted
406    ecusimSuccessfullyExecuted= True
407    requ = isotpsock2.sniff(count=1, timeout=3, started_callback=sim_started.set)
408    pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000000,
409                             dataRecord=payload*511+payload[:1])
410    if len(requ) == 0 or bytes(requ[0]) != bytes(pkt):
411        ecusimSuccessfullyExecuted = False
412        return
413    ack = b"\x76"
414    # second package with inscreased address
415    requ = isotpsock2.sniff(count=1, timeout=3, started_callback=lambda: isotpsock2.send(ack))
416    pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000FF9,
417                             dataRecord=payload[1:])
418    if len(requ) == 0 or bytes(requ[0]) != bytes(pkt):
419        ecusimSuccessfullyExecuted = False
420        return
421    ack = b"\x76"
422    isotpsock2.send(ack)
423
424thread = threading.Thread(target=ecusim)
425thread.name = "EcuSimulator" + thread.name
426sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
427thread.start()
428sim_started.wait(timeout=5)
429res = GMLAN_TransferData(isotpsock, 0x40000000, payload*512, maxmsglen=0x1000000, timeout=8) == True
430thread.join(timeout=5)
431assert res
432assert ecusimSuccessfullyExecuted == True
433
434############ Address boundary checks
435= Positive, highest possible address for scheme
436conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4
437payload = b"\x00\x11\x22\x33\x44\x55\x66\x77"
438started = threading.Event()
439def ecusim():
440    isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
441    ack = b"\x76"
442    isotpsock2.send(ack)
443
444thread = threading.Thread(target=ecusim)
445sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
446thread.start()
447started.wait(timeout=5)
448res = GMLAN_TransferData(isotpsock, 2**32 - 1, payload, timeout=1) == True
449thread.join(timeout=5)
450assert res
451
452= Negative, invalid address (too large for addressing scheme)
453conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4
454payload = b"\x00\x11\x22\x33\x44\x55\x66\x77"
455started = threading.Event()
456def ecusim():
457    isotpsock2.sniff(count=1, timeout=0.1, started_callback=started.set)
458    ack = b"\x76"
459    isotpsock2.send(ack)
460
461thread = threading.Thread(target=ecusim)
462sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
463thread.start()
464started.wait(timeout=5)
465res = GMLAN_TransferData(isotpsock, 2**32, payload, timeout=0.1) == False
466thread.join(timeout=5)
467assert res
468
469= Positive, address zero
470conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4
471ecusimSuccessfullyExecuted = True
472started = threading.Event()
473def ecusim():
474    isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
475    ack = b"\x76"
476    isotpsock2.send(ack)
477
478thread = threading.Thread(target=ecusim)
479sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
480thread.start()
481started.wait(timeout=5)
482res = GMLAN_TransferData(isotpsock, 0x00, payload, timeout=1) == True
483thread.join(timeout=5)
484assert res
485
486= Negative, negative address
487conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4
488payload = b"\x00\x11\x22\x33\x44\x55\x66\x77"
489started = threading.Event()
490def ecusim():
491    isotpsock2.sniff(count=1, timeout=0.1, started_callback=started.set)
492    ack = b"\x76"
493    isotpsock2.send(ack)
494
495thread = threading.Thread(target=ecusim)
496sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
497thread.start()
498started.wait(timeout=5)
499res = GMLAN_TransferData(isotpsock, -1, payload, timeout=0.1) == False
500thread.join(timeout=5)
501assert res
502
503############################################
504+ GMLAN_TransferPayload Tests
505############################################
506= Positive, short payload
507conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4
508payload = b"\x00\x11\x22\x33\x44\x55\x66\x77"
509ecusimSuccessfullyExecuted = True
510started = threading.Event()
511def ecusim():
512    global ecusimSuccessfullyExecuted
513    ecusimSuccessfullyExecuted= True
514    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
515    pkt = GMLAN()/GMLAN_RD(memorySize=len(payload))
516    if bytes(requ[0]) != bytes(pkt):
517        ecusimSuccessfullyExecuted = False
518    ack = b"\x74"
519    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
520    pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000000,
521                             dataRecord=payload)
522    if bytes(requ[0]) != bytes(pkt):
523        ecusimSuccessfullyExecuted = False
524    ack = b"\x76"
525    isotpsock2.send(ack)
526
527thread = threading.Thread(target=ecusim)
528sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
529thread.start()
530started.wait(timeout=5)
531res = GMLAN_TransferPayload(isotpsock, 0x40000000, payload, timeout=1) == True
532thread.join(timeout=5)
533assert res
534assert ecusimSuccessfullyExecuted == True
535
536
537############################################
538+ GMLAN_GetSecurityAccess Tests
539############################################
540= KeyFunction
541keyfunc = lambda seed : seed - 0x1FBE
542
543= Positive scenario, level 1, tests if keyfunction applied properly
544ecusimSuccessfullyExecuted = True
545started = threading.Event()
546def ecusim():
547    global ecusimSuccessfullyExecuted
548    ecusimSuccessfullyExecuted= True
549    # wait for request
550    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
551    pkt = GMLAN()/GMLAN_SA(subfunction=1)
552    if bytes(requ[0]) != bytes(pkt):
553        ecusimSuccessfullyExecuted = False
554    seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead)
555    # wait for key
556    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg))
557    pkt = GMLAN()/GMLAN_SA(subfunction=2, securityKey=0xbeef)
558    if bytes(requ[0]) != bytes(pkt):
559        ecusimSuccessfullyExecuted = False
560        nr = GMLAN() / GMLAN_NR(requestServiceId=0x27, returnCode=0x35)
561        isotpsock2.send(nr)
562    else:
563        pr = GMLAN()/GMLAN_SAPR(subfunction=2)
564        isotpsock2.send(pr)
565
566thread = threading.Thread(target=ecusim)
567sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
568thread.start()
569started.wait(timeout=5)
570res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1) == True
571thread.join(timeout=5)
572assert res
573assert ecusimSuccessfullyExecuted == True
574
575= Positive scenario, level 3
576ecusimSuccessfullyExecuted = True
577started = threading.Event()
578def ecusim():
579    global ecusimSuccessfullyExecuted
580    ecusimSuccessfullyExecuted= True
581    # wait for request
582    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
583    pkt = GMLAN()/GMLAN_SA(subfunction=3)
584    if bytes(requ[0]) != bytes(pkt):
585        ecusimSuccessfullyExecuted = False
586    seedmsg = GMLAN()/GMLAN_SAPR(subfunction=3, securitySeed=0xdead)
587    # wait for key
588    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg))
589    pkt = GMLAN()/GMLAN_SA(subfunction=4, securityKey=0xbeef)
590    if bytes(requ[0]) != bytes(pkt):
591        ecusimSuccessfullyExecuted = False
592        nr = GMLAN() / GMLAN_NR(requestServiceId=0x27, returnCode=0x35)
593        isotpsock2.send(nr)
594    else:
595        pr = GMLAN()/GMLAN_SAPR(subfunction=4)
596        isotpsock2.send(pr)
597
598thread = threading.Thread(target=ecusim)
599sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
600thread.start()
601started.wait(timeout=5)
602res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=3, timeout=0.1) == True
603thread.join(timeout=5)
604assert res
605assert ecusimSuccessfullyExecuted == True
606
607
608= Negative scenario, invalid password
609ecusimSuccessfullyExecuted = True
610started = threading.Event()
611def ecusim():
612    global ecusimSuccessfullyExecuted
613    ecusimSuccessfullyExecuted= True
614    # wait for request
615    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
616    pkt = GMLAN()/GMLAN_SA(subfunction=1)
617    if bytes(requ[0]) != bytes(pkt):
618        ecusimSuccessfullyExecuted = False
619    seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead)
620    # wait for key
621    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg))
622    pkt = GMLAN()/GMLAN_SA(subfunction=2, securityKey=0xbabe)
623    if bytes(requ[0]) != bytes(pkt):
624        nr = GMLAN() / GMLAN_NR(requestServiceId=0x27, returnCode=0x35)
625        isotpsock2.send(nr)
626    else:
627        ecusimSuccessfullyExecuted = False
628        pr = GMLAN()/GMLAN_SAPR(subfunction=2)
629        isotpsock2.send(pr)
630
631thread = threading.Thread(target=ecusim)
632sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
633thread.start()
634started.wait(timeout=5)
635res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1) == False
636thread.join(timeout=5)
637assert res
638assert ecusimSuccessfullyExecuted == True
639
640= invalid level (not an odd number)
641assert GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=2, timeout=1) == False
642
643= zero seed
644started = threading.Event()
645def ecusim():
646    # wait for request
647    isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
648    seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0x0000)
649    isotpsock2.send(seedmsg)
650
651thread = threading.Thread(target=ecusim)
652sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
653thread.start()
654started.wait(timeout=5)
655res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1) == True
656thread.join(timeout=5)
657assert res
658
659############### retry
660= Positive scenario, request timeout, retry works
661started = threading.Event()
662def ecusim():
663    # timeout
664    requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=started.set)
665    # wait for request
666    requ = isotpsock2.sniff(count=1, timeout=3)
667    seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead)
668    # wait for key
669    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg))
670    pkt = GMLAN()/GMLAN_SA(subfunction=2, securityKey=0xbeef)
671    pr = GMLAN()/GMLAN_SAPR(subfunction=2)
672    isotpsock2.send(pr)
673
674thread = threading.Thread(target=ecusim)
675sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
676thread.start()
677started.wait(timeout=5)
678res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1, retry=1) == True
679thread.join(timeout=5)
680assert res
681
682= Positive scenario, keysend timeout, retry works
683started = threading.Event()
684def ecusim():
685    # wait for request
686    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
687    seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead)
688    # timeout
689    requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=lambda:isotpsock2.send(seedmsg))
690    # retry from start
691    requ = isotpsock2.sniff(count=1, timeout=3)
692    seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead)
693    # wait for key
694    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg))
695    pr = GMLAN()/GMLAN_SAPR(subfunction=2)
696    isotpsock2.send(pr)
697
698thread = threading.Thread(target=ecusim)
699sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
700thread.start()
701started.wait(timeout=5)
702res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1, retry=1) == True
703thread.join(timeout=5)
704assert res
705
706
707= Positive scenario, request error, retry works
708started = threading.Event()
709def ecusim():
710    # wait for request
711    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
712    nr = GMLAN() / GMLAN_NR(requestServiceId=0x27, returnCode=0x37)
713    # wait for request
714    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(nr))
715    seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead)
716    # wait for key
717    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg))
718    pkt = GMLAN()/GMLAN_SA(subfunction=2, securityKey=0xbeef)
719    pr = GMLAN()/GMLAN_SAPR(subfunction=2)
720    isotpsock2.send(pr)
721
722thread = threading.Thread(target=ecusim)
723sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
724thread.start()
725started.wait(timeout=5)
726res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1, retry=1) == True
727thread.join(timeout=5)
728assert res
729
730
731##############################################################################
732+ GMLAN_InitDiagnostics Tests
733##############################################################################
734= sequence of the correct messages
735ecusimSuccessfullyExecuted = True
736started = threading.Event()
737def ecusim():
738    global ecusimSuccessfullyExecuted
739    ecusimSuccessfullyExecuted= True
740    # DisableNormalCommunication
741    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
742    pkt = GMLAN(b"\x28")
743    if bytes(requ[0]) != bytes(pkt):
744        ecusimSuccessfullyExecuted = False
745    ack = b"\x68"
746    # ReportProgrammedState
747    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
748    pkt = GMLAN(b"\xa2")
749    if bytes(requ[0]) != bytes(pkt):
750        ecusimSuccessfullyExecuted = False
751    ack = GMLAN()/GMLAN_RPSPR(programmedState=0)
752    # ProgrammingMode requestProgramming
753    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
754    pkt = GMLAN() / GMLAN_PM(subfunction=0x1)
755    if bytes(requ[0]) != bytes(pkt):
756        ecusimSuccessfullyExecuted = False
757    ack = GMLAN(b"\xe5")
758    # InitiateProgramming enableProgramming
759    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
760    pkt = GMLAN() / GMLAN_PM(subfunction=0x3)
761    if bytes(requ[0]) != bytes(pkt):
762        ecusimSuccessfullyExecuted = False
763
764thread = threading.Thread(target=ecusim)
765
766sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
767thread.start()
768started.wait(timeout=5)
769res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, unittest=True) == True
770thread.join(timeout=5)
771assert res
772assert ecusimSuccessfullyExecuted == True
773
774
775= sequence of the correct messages, disablenormalcommunication as broadcast
776ecusimSuccessfullyExecuted = True
777started = threading.Event()
778
779broadcastsender = TestSocket(CAN)
780broadcastrcv = TestSocket(CAN)
781broadcastsender.pair(broadcastrcv)
782def ecusim():
783    global ecusimSuccessfullyExecuted
784    ecusimSuccessfullyExecuted= True
785    print("DisableNormalCommunication")
786    requ = broadcastrcv.sniff(count=1, timeout=2, started_callback=started.set)
787    assert len(requ) >= 1
788    if bytes(requ[0].data)[0:3] != b"\xfe\x01\x28":
789        ecusimSuccessfullyExecuted = False
790    print("ReportProgrammedState")
791    requ = isotpsock2.sniff(count=1, timeout=2)
792    pkt = GMLAN(b"\xa2")
793    if bytes(requ[0]) != bytes(pkt):
794        ecusimSuccessfullyExecuted = False
795    ack = GMLAN()/GMLAN_RPSPR(programmedState=0)
796    print("ProgrammingMode requestProgramming")
797    requ = isotpsock2.sniff(count=1, timeout=3, started_callback=lambda: isotpsock2.send(ack))
798    pkt = GMLAN() / GMLAN_PM(subfunction=0x1)
799    if bytes(requ[0]) != bytes(pkt):
800        ecusimSuccessfullyExecuted = False
801    ack = GMLAN(b"\xe5")
802    print("InitiateProgramming enableProgramming")
803    requ = isotpsock2.sniff(count=1, timeout=3, started_callback=lambda: isotpsock2.send(ack))
804    pkt = GMLAN() / GMLAN_PM(subfunction=0x3)
805    if bytes(requ[0]) != bytes(pkt):
806        ecusimSuccessfullyExecuted = False
807
808thread = threading.Thread(target=ecusim)
809sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
810thread.start()
811started.wait(timeout=5)
812res = GMLAN_InitDiagnostics(isotpsock, broadcast_socket=GMLAN_BroadcastSocket(broadcastsender), timeout=5, unittest=True) == True
813thread.join(timeout=5)
814assert res
815assert ecusimSuccessfullyExecuted == True
816
817
818######## timeout
819= timeout DisableNormalCommunication
820ecusimSuccessfullyExecuted = True
821started = threading.Event()
822def ecusim():
823    global ecusimSuccessfullyExecuted
824    ecusimSuccessfullyExecuted= True
825    # DisableNormalCommunication
826    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
827    pkt = GMLAN(b"\x28")
828    if bytes(requ[0]) != bytes(pkt):
829        ecusimSuccessfullyExecuted = False
830
831thread = threading.Thread(target=ecusim)
832sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
833thread.start()
834started.wait(timeout=5)
835res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, unittest=True) == False
836thread.join(timeout=5)
837assert res
838assert ecusimSuccessfullyExecuted == True
839
840
841= timeout ReportProgrammedState
842ecusimSuccessfullyExecuted = True
843started = threading.Event()
844def ecusim():
845    global ecusimSuccessfullyExecuted
846    ecusimSuccessfullyExecuted= True
847    # DisableNormalCommunication
848    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
849    pkt = GMLAN(b"\x28")
850    if bytes(requ[0]) != bytes(pkt):
851        ecusimSuccessfullyExecuted = False
852    ack = b"\x68"
853    # ReportProgrammedState
854    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
855    pkt = GMLAN(b"\xa2")
856    if bytes(requ[0]) != bytes(pkt):
857        ecusimSuccessfullyExecuted = False
858    ack = GMLAN()/GMLAN_RPSPR(programmedState=0)
859    isotpsock2.send(ack)
860
861thread = threading.Thread(target=ecusim)
862sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
863thread.start()
864started.wait(timeout=5)
865res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, unittest=True) == False
866thread.join(timeout=5)
867assert res
868assert ecusimSuccessfullyExecuted == True
869
870
871= timeout ProgrammingMode requestProgramming
872ecusimSuccessfullyExecuted = True
873started = threading.Event()
874def ecusim():
875    global ecusimSuccessfullyExecuted
876    ecusimSuccessfullyExecuted= True
877    # DisableNormalCommunication
878    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
879    pkt = GMLAN(b"\x28")
880    if bytes(requ[0]) != bytes(pkt):
881        ecusimSuccessfullyExecuted = False
882    ack = b"\x68"
883    # ReportProgrammedState
884    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
885    pkt = GMLAN(b"\xa2")
886    if bytes(requ[0]) != bytes(pkt):
887        ecusimSuccessfullyExecuted = False
888    ack = GMLAN()/GMLAN_RPSPR(programmedState=0)
889    # ProgrammingMode requestProgramming
890    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
891    pkt = GMLAN() / GMLAN_PM(subfunction=0x1)
892    if bytes(requ[0]) != bytes(pkt):
893        ecusimSuccessfullyExecuted = False
894
895thread = threading.Thread(target=ecusim)
896sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
897thread.start()
898started.wait(timeout=5)
899res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, unittest=True) == False
900thread.join(timeout=5)
901assert res
902assert ecusimSuccessfullyExecuted == True
903
904###### negative response
905= timeout DisableNormalCommunication
906ecusimSuccessfullyExecuted = True
907started = threading.Event()
908def ecusim():
909    global ecusimSuccessfullyExecuted
910    ecusimSuccessfullyExecuted= True
911    # DisableNormalCommunication
912    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
913    pkt = GMLAN(b"\x28")
914    if bytes(requ[0]) != bytes(pkt):
915        ecusimSuccessfullyExecuted = False
916    ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12)
917    isotpsock2.send(ack)
918
919thread = threading.Thread(target=ecusim)
920
921sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
922thread.start()
923started.wait(timeout=5)
924res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, unittest=True) == False
925thread.join(timeout=5)
926assert res
927assert ecusimSuccessfullyExecuted == True
928
929###### retry tests
930= sequence of the correct messages, retry set
931started = threading.Event()
932def ecusim():
933    # DisableNormalCommunication
934    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
935    ack = b"\x68"
936    # ReportProgrammedState
937    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
938    ack = GMLAN()/GMLAN_RPSPR(programmedState=0)
939    # ProgrammingMode requestProgramming
940    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
941    ack = GMLAN(b"\xe5")
942    # InitiateProgramming enableProgramming
943    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
944
945thread = threading.Thread(target=ecusim)
946sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
947thread.start()
948started.wait(timeout=5)
949res = GMLAN_InitDiagnostics(isotpsock, timeout=1, retry=0, unittest=True) == True
950assert res
951thread.join(timeout=5)
952
953
954= negative response, make sure no retries are made
955ecusimSuccessfullyExecuted = True
956started = threading.Event()
957def ecusim():
958    global ecusimSuccessfullyExecuted
959    ecusimSuccessfullyExecuted= True
960    requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=started.set)
961    ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12)
962    requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=lambda:isotpsock2.send(ack))
963    if len(requ) != 0:
964        ecusimSuccessfullyExecuted = False
965
966thread = threading.Thread(target=ecusim)
967sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
968thread.start()
969started.wait(timeout=5)
970res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, retry=0, unittest=True) == False
971thread.join(timeout=5)
972assert res
973assert ecusimSuccessfullyExecuted == True
974
975
976= first fail at DisableNormalCommunication, then sequence of the correct messages
977started = threading.Event()
978def ecusim():
979    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
980    ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12)
981    # DisableNormalCommunication
982    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
983    ack = b"\x68"
984    # ReportProgrammedState
985    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
986    ack = GMLAN()/GMLAN_RPSPR(programmedState=0)
987    # ProgrammingMode requestProgramming
988    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
989    ack = GMLAN(b"\xe5")
990    # InitiateProgramming enableProgramming
991    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
992
993thread = threading.Thread(target=ecusim)
994sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
995thread.start()
996started.wait(timeout=5)
997res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, retry=1, unittest=True) == True
998thread.join(timeout=5)
999assert res
1000
1001= first fail at ReportProgrammedState, then sequence of the correct messages
1002started = threading.Event()
1003def ecusim():
1004    # DisableNormalCommunication
1005    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
1006    ack = b"\x68"
1007    # Fail
1008    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
1009    ack = GMLAN() / GMLAN_NR(requestServiceId=0xA2, returnCode=0x12)
1010    # DisableNormalCommunication
1011    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
1012    ack = b"\x68"
1013    # ReportProgrammedState
1014    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
1015    ack = GMLAN()/GMLAN_RPSPR(programmedState=0)
1016    # ProgrammingMode requestProgramming
1017    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
1018    ack = GMLAN(b"\xe5")
1019    # InitiateProgramming enableProgramming
1020    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
1021
1022thread = threading.Thread(target=ecusim)
1023sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
1024thread.start()
1025started.wait(timeout=5)
1026res = GMLAN_InitDiagnostics(isotpsock, timeout=1, retry=1, unittest=True) == True
1027thread.join(timeout=5)
1028assert res
1029
1030= first fail at ProgrammingMode requestProgramming, then sequence of the correct messages
1031started = threading.Event()
1032def ecusim():
1033    # DisableNormalCommunication
1034    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
1035    ack = b"\x68"
1036    # ReportProgrammedState
1037    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
1038    ack = GMLAN()/GMLAN_RPSPR(programmedState=0)
1039    # Fail
1040    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
1041    ack = GMLAN() / GMLAN_NR(requestServiceId=0xA5, returnCode=0x12)
1042    # DisableNormalCommunication
1043    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
1044    ack = b"\x68"
1045    # ReportProgrammedState
1046    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
1047    ack = GMLAN()/GMLAN_RPSPR(programmedState=0)
1048    # ProgrammingMode requestProgramming
1049    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
1050    ack = GMLAN(b"\xe5")
1051    isotpsock2.send(ack)
1052    # InitiateProgramming enableProgramming
1053    requ = isotpsock2.sniff(count=1, timeout=1)
1054
1055thread = threading.Thread(target=ecusim)
1056sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
1057thread.start()
1058started.wait(timeout=5)
1059res = GMLAN_InitDiagnostics(isotpsock, timeout=1, retry=1, unittest=True) == True
1060thread.join(timeout=5)
1061assert res
1062
1063= fail twice
1064ecusimSuccessfullyExecuted = True
1065started = threading.Event()
1066def ecusim():
1067    global ecusimSuccessfullyExecuted
1068    ecusimSuccessfullyExecuted= True
1069    requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=started.set)
1070    ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12)
1071    requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=lambda:isotpsock2.send(ack))
1072    ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12)
1073    requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=lambda:isotpsock2.send(ack))
1074    if len(requ) != 0:
1075        ecusimSuccessfullyExecuted = False
1076
1077thread = threading.Thread(target=ecusim)
1078sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
1079thread.start()
1080started.wait(timeout=5)
1081res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, retry=1, unittest=True) == False
1082thread.join(timeout=5)
1083assert res
1084assert ecusimSuccessfullyExecuted == True
1085
1086##############################################################################
1087+ GMLAN_ReadMemoryByAddress Tests
1088##############################################################################
1089= Positive, short length, scheme = 4
1090conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4
1091payload = b"\x00\x11\x22\x33\x44\x55\x66\x77"
1092ecusimSuccessfullyExecuted = True
1093started = threading.Event()
1094def ecusim():
1095    global ecusimSuccessfullyExecuted
1096    ecusimSuccessfullyExecuted= True
1097    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
1098    pkt = GMLAN() / GMLAN_RMBA(memoryAddress=0x0, memorySize=0x8)
1099    if bytes(requ[0]) != bytes(pkt):
1100        ecusimSuccessfullyExecuted = False
1101    ack = GMLAN() / GMLAN_RMBAPR(memoryAddress=0x0, dataRecord=payload)
1102    isotpsock2.send(ack)
1103
1104thread = threading.Thread(target=ecusim)
1105sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
1106thread.start()
1107started.wait(timeout=5)
1108res = GMLAN_ReadMemoryByAddress(isotpsock, 0x0, 0x8, timeout=1) == payload
1109thread.join(timeout=5)
1110assert res
1111assert ecusimSuccessfullyExecuted == True
1112
1113
1114= Negative, negative response
1115conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4
1116payload = b"\x00\x11\x22\x33\x44\x55\x66\x77"
1117started = threading.Event()
1118def ecusim():
1119    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
1120    ack = GMLAN() / GMLAN_NR(requestServiceId=0x23, returnCode=0x31)
1121    isotpsock2.send(ack)
1122
1123thread = threading.Thread(target=ecusim)
1124sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
1125thread.start()
1126started.wait(timeout=5)
1127res = GMLAN_ReadMemoryByAddress(isotpsock, 0x0, 0x8, timeout=1) is None
1128thread.join(timeout=5)
1129assert res
1130
1131= Negative, timeout
1132assert GMLAN_ReadMemoryByAddress(isotpsock, 0x0, 0x8, timeout=0.01) is None
1133
1134###### RETRY
1135= Positive, negative response, retry succeeds
1136conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4
1137payload = b"\x00\x11\x22\x33\x44\x55\x66\x77"
1138started = threading.Event()
1139def ecusim():
1140    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set)
1141    ack = GMLAN() / GMLAN_NR(requestServiceId=0x23, returnCode=0x31)
1142    requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack))
1143    ack = GMLAN() / GMLAN_RMBAPR(memoryAddress=0x0, dataRecord=payload)
1144    isotpsock2.send(ack)
1145
1146thread = threading.Thread(target=ecusim)
1147sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2])
1148thread.start()
1149started.wait(timeout=5)
1150res = GMLAN_ReadMemoryByAddress(isotpsock, 0x0, 0x8, timeout=1, retry=1) == payload
1151thread.join(timeout=5)
1152assert res
1153
1154+ Cleanup
1155
1156= Delete TestSockets
1157
1158cleanup_testsockets()
1159