1% Regression tests for gmlanutil 2~ scanner 3 4+ Configuration 5~ conf 6 7 8= Imports 9 10from scapy.contrib.automotive import log_automotive 11from test.testsocket import TestSocket, cleanup_testsockets 12import logging 13 14############ 15############ 16+ Load general modules 17 18= Load contribution layer 19 20load_layer("can", globals_dict=globals()) 21load_contrib("automotive.gm.gmlan", globals_dict=globals()) 22load_contrib("automotive.gm.gmlanutils", globals_dict=globals()) 23 24log_automotive.setLevel(logging.DEBUG) 25 26= Define test sockets 27 28isotpsock2 = TestSocket(GMLAN) 29isotpsock = TestSocket(GMLAN) 30isotpsock2.pair(isotpsock) 31 32############################################################################## 33+ GMLAN_RequestDownload Tests 34############################################################################## 35= Positive, immediate positive response 36 37ecusimSuccessfullyExecuted = False 38started = threading.Event() 39 40def ecusim(): 41 global ecusimSuccessfullyExecuted 42 ecusimSuccessfullyExecuted= True 43 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 44 pkt = GMLAN()/GMLAN_RD(memorySize=4) 45 if bytes(requ[0]) != bytes(pkt): 46 ecusimSuccessfullyExecuted = False 47 else: 48 ecusimSuccessfullyExecuted = True 49 ack = b"\x74" 50 isotpsock2.send(ack) 51 52thread = threading.Thread(target=ecusim) 53sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 54thread.start() 55started.wait(timeout=5) 56res = GMLAN_RequestDownload(isotpsock, 4, timeout=1) == True 57thread.join(timeout=5) 58 59assert res 60assert ecusimSuccessfullyExecuted == True 61 62= Negative, immediate negative response 63 64started = threading.Event() 65def ecusim(): 66 isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 67 nr = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x22) 68 isotpsock2.send(nr) 69 70thread = threading.Thread(target=ecusim) 71sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 72thread.start() 73started.wait(timeout=5) 74res = GMLAN_RequestDownload(isotpsock, 4, timeout=1) == False 75thread.join(timeout=5) 76assert res 77 78= Negative, timeout 79assert GMLAN_RequestDownload(isotpsock, 4, timeout=0.01) == False 80 81############################ Response pending 82= Positive, after response pending 83started = threading.Event() 84 85def ecusim(): 86 isotpsock2.sniff(count=1, timeout=2, started_callback=started.set) 87 pending = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78) 88 isotpsock2.send(pending) 89 ack = b"\x74" 90 isotpsock2.send(ack) 91 92thread = threading.Thread(target=ecusim) 93sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 94thread.start() 95started.wait(timeout=5) 96res = GMLAN_RequestDownload(isotpsock, 4, timeout=2) == True 97thread.join(timeout=5) 98assert res 99 100= Positive, hold response pending for several messages 101tout = 0.1 102repeats = 4 103started = threading.Event() 104 105def ecusim(): 106 isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 107 ack = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78) 108 for i in range(repeats): 109 isotpsock2.send(ack) 110 time.sleep(tout) 111 ack = b"\x74" 112 isotpsock2.send(ack) 113 114thread = threading.Thread(target=ecusim) 115sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 116thread.start() 117started.wait(timeout=5) 118starttime = time.time() # may be inaccurate -> on some systems only seconds precision 119result = GMLAN_RequestDownload(isotpsock, 4, timeout=repeats*tout+0.5) 120endtime = time.time() + 1 121thread.join(timeout=5) 122assert result 123print(endtime - starttime) 124print(tout * (repeats - 1)) 125assert (endtime - starttime) >= tout * (repeats - 1) 126 127 128= Negative, negative response after response pending 129started = threading.Event() 130def ecusim(): 131 isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 132 pending = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78) 133 isotpsock2.send(pending) 134 nr = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x22) 135 isotpsock2.send(nr) 136 137thread = threading.Thread(target=ecusim) 138sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 139thread.start() 140started.wait(timeout=5) 141res = GMLAN_RequestDownload(isotpsock, 4, timeout=0.1) == False 142thread.join(timeout=5) 143assert res 144 145 146= Negative, timeout after response pending 147started = threading.Event() 148def ecusim(): 149 isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 150 pending = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78) 151 isotpsock2.send(pending) 152 153thread = threading.Thread(target=ecusim) 154sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 155thread.start() 156started.wait(timeout=5) 157res = GMLAN_RequestDownload(isotpsock, 4, timeout=0.1) == False 158thread.join(timeout=5) 159assert res 160 161 162 163= Positive, pending message from different service interferes while pending 164started = threading.Event() 165def ecusim(): 166 isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 167 pending = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78) 168 isotpsock2.send(pending) 169 wrongservice = GMLAN()/GMLAN_NR(requestServiceId=0x36, returnCode=0x78) 170 isotpsock2.send(wrongservice) 171 isotpsock2.send(pending) 172 ack = b"\x74" 173 isotpsock2.send(ack) 174 175thread = threading.Thread(target=ecusim) 176sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 177thread.start() 178started.wait(timeout=5) 179res = GMLAN_RequestDownload(isotpsock, 4, timeout=0.1) == True 180thread.join(timeout=5) 181assert res 182 183= Positive, negative response from different service interferes while pending 184started = threading.Event() 185 186def ecusim(): 187 isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 188 pending = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78) 189 isotpsock2.send(pending) 190 wrongservice = GMLAN()/GMLAN_NR(requestServiceId=0x36, returnCode=0x22) 191 isotpsock2.send(wrongservice) 192 isotpsock2.send(pending) 193 ack = b"\x74" 194 isotpsock2.send(ack) 195 196thread = threading.Thread(target=ecusim) 197sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 198thread.start() 199started.wait(timeout=5) 200res = GMLAN_RequestDownload(isotpsock, 4, timeout=0.1) == True 201thread.join(timeout=5) 202assert res 203 204################### RETRY 205= Positive, first: immediate negative response, retry: Positive 206started = threading.Event() 207def ecusim(): 208 global ecusimSuccessfullyExecuted 209 ecusimSuccessfullyExecuted= True 210 # negative 211 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 212 pkt = GMLAN()/GMLAN_RD(memorySize=4) 213 if bytes(requ[0]) != bytes(pkt): 214 ecusimSuccessfullyExecuted = False 215 nr = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x22) 216 # positive retry 217 print("retry") 218 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(nr)) 219 pkt = GMLAN()/GMLAN_RD(memorySize=4) 220 print(requ) 221 if bytes(requ[0]) != bytes(pkt): 222 ecusimSuccessfullyExecuted = False 223 ack = b"\x74" 224 isotpsock2.send(ack) 225 226thread = threading.Thread(target=ecusim) 227sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 228thread.start() 229started.wait(timeout=5) 230res = GMLAN_RequestDownload(isotpsock, 4, timeout=0.1, retry=1) == True 231thread.join(timeout=5) 232assert res 233assert ecusimSuccessfullyExecuted == True 234 235 236 237############################################################################## 238+ GMLAN_TransferData Tests 239############################################################################## 240= Positive, short payload, scheme = 4 241conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 242payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" 243ecusimSuccessfullyExecuted = True 244started = threading.Event() 245def ecusim(): 246 global ecusimSuccessfullyExecuted 247 ecusimSuccessfullyExecuted= True 248 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 249 pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000000, 250 dataRecord=payload) 251 if bytes(requ[0]) != bytes(pkt): 252 ecusimSuccessfullyExecuted = False 253 ack = b"\x76" 254 isotpsock2.send(ack) 255 256thread = threading.Thread(target=ecusim) 257sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 258thread.start() 259started.wait(timeout=5) 260res = GMLAN_TransferData(isotpsock, 0x40000000, payload, timeout=1) == True 261thread.join(timeout=5) 262assert res 263assert ecusimSuccessfullyExecuted == True 264 265= Positive, short payload, scheme = 3 266conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 3 267payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" 268ecusimSuccessfullyExecuted = True 269started = threading.Event() 270def ecusim(): 271 global ecusimSuccessfullyExecuted 272 ecusimSuccessfullyExecuted= True 273 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 274 pkt = GMLAN() / GMLAN_TD(startingAddress=0x400000, 275 dataRecord=payload) 276 if bytes(requ[0]) != bytes(pkt): 277 ecusimSuccessfullyExecuted = False 278 ack = b"\x76" 279 isotpsock2.send(ack) 280 281thread = threading.Thread(target=ecusim) 282sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 283thread.start() 284started.wait(timeout=5) 285res = GMLAN_TransferData(isotpsock, 0x400000, payload, timeout=1) == True 286thread.join(timeout=5) 287assert res 288assert ecusimSuccessfullyExecuted == True 289 290= Positive, short payload, scheme = 2 291conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 2 292payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" 293ecusimSuccessfullyExecuted = True 294started = threading.Event() 295def ecusim(): 296 global ecusimSuccessfullyExecuted 297 ecusimSuccessfullyExecuted = True 298 requ = isotpsock2.sniff(count=1, timeout=2, started_callback=started.set) 299 pkt = GMLAN() / GMLAN_TD(startingAddress=0x4000, dataRecord=payload) 300 if bytes(requ[0]) != bytes(pkt): 301 ecusimSuccessfullyExecuted = False 302 ack = b"\x76" 303 isotpsock2.send(ack) 304 305thread = threading.Thread(target=ecusim) 306sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 307thread.start() 308started.wait(timeout=5) 309res = GMLAN_TransferData(isotpsock, 0x4000, payload, timeout=2) == True 310thread.join(timeout=5) 311assert res 312assert ecusimSuccessfullyExecuted == True 313 314= Negative, short payload 315conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 316payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" 317ecusimSuccessfullyExecuted = True 318started = threading.Event() 319def ecusim(): 320 global ecusimSuccessfullyExecuted 321 ecusimSuccessfullyExecuted= True 322 isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 323 nr = GMLAN() / GMLAN_NR(requestServiceId=0x36, returnCode=0x22) 324 isotpsock2.send(nr) 325 326thread = threading.Thread(target=ecusim) 327sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 328thread.start() 329started.wait(timeout=5) 330res = GMLAN_TransferData(isotpsock, 0x40000000, payload, timeout=1) == False 331thread.join(timeout=5) 332assert res 333 334= Negative, timeout 335 336assert GMLAN_TransferData(isotpsock, 0x4000, payload, timeout=0.1) == False 337 338 339= Positive, long payload 340conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 341payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" 342ecusimSuccessfullyExecuted = True 343started = threading.Event() 344def ecusim(): 345 global ecusimSuccessfullyExecuted 346 ecusimSuccessfullyExecuted= True 347 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 348 pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000000, 349 dataRecord=payload*2) 350 if bytes(requ[0]) != bytes(pkt): 351 ecusimSuccessfullyExecuted = False 352 ack = b"\x76" 353 # second package with inscreased address 354 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 355 pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000010, 356 dataRecord=payload * 2) 357 if bytes(requ[0]) != bytes(pkt): 358 ecusimSuccessfullyExecuted = False 359 ack = b"\x76" 360 isotpsock2.send(ack) 361 362thread = threading.Thread(target=ecusim) 363sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 364thread.start() 365started.wait(timeout=5) 366res = GMLAN_TransferData(isotpsock, 0x40000000, payload*4, maxmsglen=16, timeout=1) == True 367thread.join(timeout=5) 368assert res 369assert ecusimSuccessfullyExecuted == True 370 371 372# 373= Positive, first part of payload succeeds, second pending, then fails, retry succeeds 374conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 375payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" 376started = threading.Event() 377def ecusim(): 378 isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 379 ack = b"\x76" 380 # second package with inscreased address 381 isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 382 pending = GMLAN() / GMLAN_NR(requestServiceId=0x36, returnCode=0x78) 383 isotpsock2.send(pending) 384 nr = GMLAN() / GMLAN_NR(requestServiceId=0x36, returnCode=0x22) 385 isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(nr)) 386 ack = b"\x76" 387 isotpsock2.send(ack) 388 389thread = threading.Thread(target=ecusim) 390sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 391thread.start() 392started.wait(timeout=5) 393res = GMLAN_TransferData(isotpsock, 0x40000000, payload*4, maxmsglen=16, timeout=0.1, retry=1) == True 394thread.join(timeout=5) 395assert res 396 397############ 398= Positive, maxmsglen length check -> message is split automatically 399 400conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 401payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" 402ecusimSuccessfullyExecuted = True 403sim_started = threading.Event() 404def ecusim(): 405 global ecusimSuccessfullyExecuted 406 ecusimSuccessfullyExecuted= True 407 requ = isotpsock2.sniff(count=1, timeout=3, started_callback=sim_started.set) 408 pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000000, 409 dataRecord=payload*511+payload[:1]) 410 if len(requ) == 0 or bytes(requ[0]) != bytes(pkt): 411 ecusimSuccessfullyExecuted = False 412 return 413 ack = b"\x76" 414 # second package with inscreased address 415 requ = isotpsock2.sniff(count=1, timeout=3, started_callback=lambda: isotpsock2.send(ack)) 416 pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000FF9, 417 dataRecord=payload[1:]) 418 if len(requ) == 0 or bytes(requ[0]) != bytes(pkt): 419 ecusimSuccessfullyExecuted = False 420 return 421 ack = b"\x76" 422 isotpsock2.send(ack) 423 424thread = threading.Thread(target=ecusim) 425thread.name = "EcuSimulator" + thread.name 426sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 427thread.start() 428sim_started.wait(timeout=5) 429res = GMLAN_TransferData(isotpsock, 0x40000000, payload*512, maxmsglen=0x1000000, timeout=8) == True 430thread.join(timeout=5) 431assert res 432assert ecusimSuccessfullyExecuted == True 433 434############ Address boundary checks 435= Positive, highest possible address for scheme 436conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 437payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" 438started = threading.Event() 439def ecusim(): 440 isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 441 ack = b"\x76" 442 isotpsock2.send(ack) 443 444thread = threading.Thread(target=ecusim) 445sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 446thread.start() 447started.wait(timeout=5) 448res = GMLAN_TransferData(isotpsock, 2**32 - 1, payload, timeout=1) == True 449thread.join(timeout=5) 450assert res 451 452= Negative, invalid address (too large for addressing scheme) 453conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 454payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" 455started = threading.Event() 456def ecusim(): 457 isotpsock2.sniff(count=1, timeout=0.1, started_callback=started.set) 458 ack = b"\x76" 459 isotpsock2.send(ack) 460 461thread = threading.Thread(target=ecusim) 462sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 463thread.start() 464started.wait(timeout=5) 465res = GMLAN_TransferData(isotpsock, 2**32, payload, timeout=0.1) == False 466thread.join(timeout=5) 467assert res 468 469= Positive, address zero 470conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 471ecusimSuccessfullyExecuted = True 472started = threading.Event() 473def ecusim(): 474 isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 475 ack = b"\x76" 476 isotpsock2.send(ack) 477 478thread = threading.Thread(target=ecusim) 479sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 480thread.start() 481started.wait(timeout=5) 482res = GMLAN_TransferData(isotpsock, 0x00, payload, timeout=1) == True 483thread.join(timeout=5) 484assert res 485 486= Negative, negative address 487conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 488payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" 489started = threading.Event() 490def ecusim(): 491 isotpsock2.sniff(count=1, timeout=0.1, started_callback=started.set) 492 ack = b"\x76" 493 isotpsock2.send(ack) 494 495thread = threading.Thread(target=ecusim) 496sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 497thread.start() 498started.wait(timeout=5) 499res = GMLAN_TransferData(isotpsock, -1, payload, timeout=0.1) == False 500thread.join(timeout=5) 501assert res 502 503############################################ 504+ GMLAN_TransferPayload Tests 505############################################ 506= Positive, short payload 507conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 508payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" 509ecusimSuccessfullyExecuted = True 510started = threading.Event() 511def ecusim(): 512 global ecusimSuccessfullyExecuted 513 ecusimSuccessfullyExecuted= True 514 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 515 pkt = GMLAN()/GMLAN_RD(memorySize=len(payload)) 516 if bytes(requ[0]) != bytes(pkt): 517 ecusimSuccessfullyExecuted = False 518 ack = b"\x74" 519 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 520 pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000000, 521 dataRecord=payload) 522 if bytes(requ[0]) != bytes(pkt): 523 ecusimSuccessfullyExecuted = False 524 ack = b"\x76" 525 isotpsock2.send(ack) 526 527thread = threading.Thread(target=ecusim) 528sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 529thread.start() 530started.wait(timeout=5) 531res = GMLAN_TransferPayload(isotpsock, 0x40000000, payload, timeout=1) == True 532thread.join(timeout=5) 533assert res 534assert ecusimSuccessfullyExecuted == True 535 536 537############################################ 538+ GMLAN_GetSecurityAccess Tests 539############################################ 540= KeyFunction 541keyfunc = lambda seed : seed - 0x1FBE 542 543= Positive scenario, level 1, tests if keyfunction applied properly 544ecusimSuccessfullyExecuted = True 545started = threading.Event() 546def ecusim(): 547 global ecusimSuccessfullyExecuted 548 ecusimSuccessfullyExecuted= True 549 # wait for request 550 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 551 pkt = GMLAN()/GMLAN_SA(subfunction=1) 552 if bytes(requ[0]) != bytes(pkt): 553 ecusimSuccessfullyExecuted = False 554 seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) 555 # wait for key 556 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) 557 pkt = GMLAN()/GMLAN_SA(subfunction=2, securityKey=0xbeef) 558 if bytes(requ[0]) != bytes(pkt): 559 ecusimSuccessfullyExecuted = False 560 nr = GMLAN() / GMLAN_NR(requestServiceId=0x27, returnCode=0x35) 561 isotpsock2.send(nr) 562 else: 563 pr = GMLAN()/GMLAN_SAPR(subfunction=2) 564 isotpsock2.send(pr) 565 566thread = threading.Thread(target=ecusim) 567sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 568thread.start() 569started.wait(timeout=5) 570res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1) == True 571thread.join(timeout=5) 572assert res 573assert ecusimSuccessfullyExecuted == True 574 575= Positive scenario, level 3 576ecusimSuccessfullyExecuted = True 577started = threading.Event() 578def ecusim(): 579 global ecusimSuccessfullyExecuted 580 ecusimSuccessfullyExecuted= True 581 # wait for request 582 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 583 pkt = GMLAN()/GMLAN_SA(subfunction=3) 584 if bytes(requ[0]) != bytes(pkt): 585 ecusimSuccessfullyExecuted = False 586 seedmsg = GMLAN()/GMLAN_SAPR(subfunction=3, securitySeed=0xdead) 587 # wait for key 588 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) 589 pkt = GMLAN()/GMLAN_SA(subfunction=4, securityKey=0xbeef) 590 if bytes(requ[0]) != bytes(pkt): 591 ecusimSuccessfullyExecuted = False 592 nr = GMLAN() / GMLAN_NR(requestServiceId=0x27, returnCode=0x35) 593 isotpsock2.send(nr) 594 else: 595 pr = GMLAN()/GMLAN_SAPR(subfunction=4) 596 isotpsock2.send(pr) 597 598thread = threading.Thread(target=ecusim) 599sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 600thread.start() 601started.wait(timeout=5) 602res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=3, timeout=0.1) == True 603thread.join(timeout=5) 604assert res 605assert ecusimSuccessfullyExecuted == True 606 607 608= Negative scenario, invalid password 609ecusimSuccessfullyExecuted = True 610started = threading.Event() 611def ecusim(): 612 global ecusimSuccessfullyExecuted 613 ecusimSuccessfullyExecuted= True 614 # wait for request 615 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 616 pkt = GMLAN()/GMLAN_SA(subfunction=1) 617 if bytes(requ[0]) != bytes(pkt): 618 ecusimSuccessfullyExecuted = False 619 seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) 620 # wait for key 621 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) 622 pkt = GMLAN()/GMLAN_SA(subfunction=2, securityKey=0xbabe) 623 if bytes(requ[0]) != bytes(pkt): 624 nr = GMLAN() / GMLAN_NR(requestServiceId=0x27, returnCode=0x35) 625 isotpsock2.send(nr) 626 else: 627 ecusimSuccessfullyExecuted = False 628 pr = GMLAN()/GMLAN_SAPR(subfunction=2) 629 isotpsock2.send(pr) 630 631thread = threading.Thread(target=ecusim) 632sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 633thread.start() 634started.wait(timeout=5) 635res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1) == False 636thread.join(timeout=5) 637assert res 638assert ecusimSuccessfullyExecuted == True 639 640= invalid level (not an odd number) 641assert GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=2, timeout=1) == False 642 643= zero seed 644started = threading.Event() 645def ecusim(): 646 # wait for request 647 isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 648 seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0x0000) 649 isotpsock2.send(seedmsg) 650 651thread = threading.Thread(target=ecusim) 652sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 653thread.start() 654started.wait(timeout=5) 655res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1) == True 656thread.join(timeout=5) 657assert res 658 659############### retry 660= Positive scenario, request timeout, retry works 661started = threading.Event() 662def ecusim(): 663 # timeout 664 requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=started.set) 665 # wait for request 666 requ = isotpsock2.sniff(count=1, timeout=3) 667 seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) 668 # wait for key 669 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) 670 pkt = GMLAN()/GMLAN_SA(subfunction=2, securityKey=0xbeef) 671 pr = GMLAN()/GMLAN_SAPR(subfunction=2) 672 isotpsock2.send(pr) 673 674thread = threading.Thread(target=ecusim) 675sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 676thread.start() 677started.wait(timeout=5) 678res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1, retry=1) == True 679thread.join(timeout=5) 680assert res 681 682= Positive scenario, keysend timeout, retry works 683started = threading.Event() 684def ecusim(): 685 # wait for request 686 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 687 seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) 688 # timeout 689 requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=lambda:isotpsock2.send(seedmsg)) 690 # retry from start 691 requ = isotpsock2.sniff(count=1, timeout=3) 692 seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) 693 # wait for key 694 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) 695 pr = GMLAN()/GMLAN_SAPR(subfunction=2) 696 isotpsock2.send(pr) 697 698thread = threading.Thread(target=ecusim) 699sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 700thread.start() 701started.wait(timeout=5) 702res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1, retry=1) == True 703thread.join(timeout=5) 704assert res 705 706 707= Positive scenario, request error, retry works 708started = threading.Event() 709def ecusim(): 710 # wait for request 711 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 712 nr = GMLAN() / GMLAN_NR(requestServiceId=0x27, returnCode=0x37) 713 # wait for request 714 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(nr)) 715 seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) 716 # wait for key 717 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) 718 pkt = GMLAN()/GMLAN_SA(subfunction=2, securityKey=0xbeef) 719 pr = GMLAN()/GMLAN_SAPR(subfunction=2) 720 isotpsock2.send(pr) 721 722thread = threading.Thread(target=ecusim) 723sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 724thread.start() 725started.wait(timeout=5) 726res = GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=1, timeout=0.1, retry=1) == True 727thread.join(timeout=5) 728assert res 729 730 731############################################################################## 732+ GMLAN_InitDiagnostics Tests 733############################################################################## 734= sequence of the correct messages 735ecusimSuccessfullyExecuted = True 736started = threading.Event() 737def ecusim(): 738 global ecusimSuccessfullyExecuted 739 ecusimSuccessfullyExecuted= True 740 # DisableNormalCommunication 741 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 742 pkt = GMLAN(b"\x28") 743 if bytes(requ[0]) != bytes(pkt): 744 ecusimSuccessfullyExecuted = False 745 ack = b"\x68" 746 # ReportProgrammedState 747 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 748 pkt = GMLAN(b"\xa2") 749 if bytes(requ[0]) != bytes(pkt): 750 ecusimSuccessfullyExecuted = False 751 ack = GMLAN()/GMLAN_RPSPR(programmedState=0) 752 # ProgrammingMode requestProgramming 753 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 754 pkt = GMLAN() / GMLAN_PM(subfunction=0x1) 755 if bytes(requ[0]) != bytes(pkt): 756 ecusimSuccessfullyExecuted = False 757 ack = GMLAN(b"\xe5") 758 # InitiateProgramming enableProgramming 759 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 760 pkt = GMLAN() / GMLAN_PM(subfunction=0x3) 761 if bytes(requ[0]) != bytes(pkt): 762 ecusimSuccessfullyExecuted = False 763 764thread = threading.Thread(target=ecusim) 765 766sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 767thread.start() 768started.wait(timeout=5) 769res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, unittest=True) == True 770thread.join(timeout=5) 771assert res 772assert ecusimSuccessfullyExecuted == True 773 774 775= sequence of the correct messages, disablenormalcommunication as broadcast 776ecusimSuccessfullyExecuted = True 777started = threading.Event() 778 779broadcastsender = TestSocket(CAN) 780broadcastrcv = TestSocket(CAN) 781broadcastsender.pair(broadcastrcv) 782def ecusim(): 783 global ecusimSuccessfullyExecuted 784 ecusimSuccessfullyExecuted= True 785 print("DisableNormalCommunication") 786 requ = broadcastrcv.sniff(count=1, timeout=2, started_callback=started.set) 787 assert len(requ) >= 1 788 if bytes(requ[0].data)[0:3] != b"\xfe\x01\x28": 789 ecusimSuccessfullyExecuted = False 790 print("ReportProgrammedState") 791 requ = isotpsock2.sniff(count=1, timeout=2) 792 pkt = GMLAN(b"\xa2") 793 if bytes(requ[0]) != bytes(pkt): 794 ecusimSuccessfullyExecuted = False 795 ack = GMLAN()/GMLAN_RPSPR(programmedState=0) 796 print("ProgrammingMode requestProgramming") 797 requ = isotpsock2.sniff(count=1, timeout=3, started_callback=lambda: isotpsock2.send(ack)) 798 pkt = GMLAN() / GMLAN_PM(subfunction=0x1) 799 if bytes(requ[0]) != bytes(pkt): 800 ecusimSuccessfullyExecuted = False 801 ack = GMLAN(b"\xe5") 802 print("InitiateProgramming enableProgramming") 803 requ = isotpsock2.sniff(count=1, timeout=3, started_callback=lambda: isotpsock2.send(ack)) 804 pkt = GMLAN() / GMLAN_PM(subfunction=0x3) 805 if bytes(requ[0]) != bytes(pkt): 806 ecusimSuccessfullyExecuted = False 807 808thread = threading.Thread(target=ecusim) 809sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 810thread.start() 811started.wait(timeout=5) 812res = GMLAN_InitDiagnostics(isotpsock, broadcast_socket=GMLAN_BroadcastSocket(broadcastsender), timeout=5, unittest=True) == True 813thread.join(timeout=5) 814assert res 815assert ecusimSuccessfullyExecuted == True 816 817 818######## timeout 819= timeout DisableNormalCommunication 820ecusimSuccessfullyExecuted = True 821started = threading.Event() 822def ecusim(): 823 global ecusimSuccessfullyExecuted 824 ecusimSuccessfullyExecuted= True 825 # DisableNormalCommunication 826 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 827 pkt = GMLAN(b"\x28") 828 if bytes(requ[0]) != bytes(pkt): 829 ecusimSuccessfullyExecuted = False 830 831thread = threading.Thread(target=ecusim) 832sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 833thread.start() 834started.wait(timeout=5) 835res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, unittest=True) == False 836thread.join(timeout=5) 837assert res 838assert ecusimSuccessfullyExecuted == True 839 840 841= timeout ReportProgrammedState 842ecusimSuccessfullyExecuted = True 843started = threading.Event() 844def ecusim(): 845 global ecusimSuccessfullyExecuted 846 ecusimSuccessfullyExecuted= True 847 # DisableNormalCommunication 848 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 849 pkt = GMLAN(b"\x28") 850 if bytes(requ[0]) != bytes(pkt): 851 ecusimSuccessfullyExecuted = False 852 ack = b"\x68" 853 # ReportProgrammedState 854 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 855 pkt = GMLAN(b"\xa2") 856 if bytes(requ[0]) != bytes(pkt): 857 ecusimSuccessfullyExecuted = False 858 ack = GMLAN()/GMLAN_RPSPR(programmedState=0) 859 isotpsock2.send(ack) 860 861thread = threading.Thread(target=ecusim) 862sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 863thread.start() 864started.wait(timeout=5) 865res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, unittest=True) == False 866thread.join(timeout=5) 867assert res 868assert ecusimSuccessfullyExecuted == True 869 870 871= timeout ProgrammingMode requestProgramming 872ecusimSuccessfullyExecuted = True 873started = threading.Event() 874def ecusim(): 875 global ecusimSuccessfullyExecuted 876 ecusimSuccessfullyExecuted= True 877 # DisableNormalCommunication 878 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 879 pkt = GMLAN(b"\x28") 880 if bytes(requ[0]) != bytes(pkt): 881 ecusimSuccessfullyExecuted = False 882 ack = b"\x68" 883 # ReportProgrammedState 884 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 885 pkt = GMLAN(b"\xa2") 886 if bytes(requ[0]) != bytes(pkt): 887 ecusimSuccessfullyExecuted = False 888 ack = GMLAN()/GMLAN_RPSPR(programmedState=0) 889 # ProgrammingMode requestProgramming 890 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 891 pkt = GMLAN() / GMLAN_PM(subfunction=0x1) 892 if bytes(requ[0]) != bytes(pkt): 893 ecusimSuccessfullyExecuted = False 894 895thread = threading.Thread(target=ecusim) 896sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 897thread.start() 898started.wait(timeout=5) 899res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, unittest=True) == False 900thread.join(timeout=5) 901assert res 902assert ecusimSuccessfullyExecuted == True 903 904###### negative response 905= timeout DisableNormalCommunication 906ecusimSuccessfullyExecuted = True 907started = threading.Event() 908def ecusim(): 909 global ecusimSuccessfullyExecuted 910 ecusimSuccessfullyExecuted= True 911 # DisableNormalCommunication 912 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 913 pkt = GMLAN(b"\x28") 914 if bytes(requ[0]) != bytes(pkt): 915 ecusimSuccessfullyExecuted = False 916 ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12) 917 isotpsock2.send(ack) 918 919thread = threading.Thread(target=ecusim) 920 921sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 922thread.start() 923started.wait(timeout=5) 924res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, unittest=True) == False 925thread.join(timeout=5) 926assert res 927assert ecusimSuccessfullyExecuted == True 928 929###### retry tests 930= sequence of the correct messages, retry set 931started = threading.Event() 932def ecusim(): 933 # DisableNormalCommunication 934 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 935 ack = b"\x68" 936 # ReportProgrammedState 937 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 938 ack = GMLAN()/GMLAN_RPSPR(programmedState=0) 939 # ProgrammingMode requestProgramming 940 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 941 ack = GMLAN(b"\xe5") 942 # InitiateProgramming enableProgramming 943 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 944 945thread = threading.Thread(target=ecusim) 946sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 947thread.start() 948started.wait(timeout=5) 949res = GMLAN_InitDiagnostics(isotpsock, timeout=1, retry=0, unittest=True) == True 950assert res 951thread.join(timeout=5) 952 953 954= negative response, make sure no retries are made 955ecusimSuccessfullyExecuted = True 956started = threading.Event() 957def ecusim(): 958 global ecusimSuccessfullyExecuted 959 ecusimSuccessfullyExecuted= True 960 requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=started.set) 961 ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12) 962 requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=lambda:isotpsock2.send(ack)) 963 if len(requ) != 0: 964 ecusimSuccessfullyExecuted = False 965 966thread = threading.Thread(target=ecusim) 967sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 968thread.start() 969started.wait(timeout=5) 970res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, retry=0, unittest=True) == False 971thread.join(timeout=5) 972assert res 973assert ecusimSuccessfullyExecuted == True 974 975 976= first fail at DisableNormalCommunication, then sequence of the correct messages 977started = threading.Event() 978def ecusim(): 979 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 980 ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12) 981 # DisableNormalCommunication 982 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 983 ack = b"\x68" 984 # ReportProgrammedState 985 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 986 ack = GMLAN()/GMLAN_RPSPR(programmedState=0) 987 # ProgrammingMode requestProgramming 988 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 989 ack = GMLAN(b"\xe5") 990 # InitiateProgramming enableProgramming 991 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 992 993thread = threading.Thread(target=ecusim) 994sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 995thread.start() 996started.wait(timeout=5) 997res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, retry=1, unittest=True) == True 998thread.join(timeout=5) 999assert res 1000 1001= first fail at ReportProgrammedState, then sequence of the correct messages 1002started = threading.Event() 1003def ecusim(): 1004 # DisableNormalCommunication 1005 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 1006 ack = b"\x68" 1007 # Fail 1008 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 1009 ack = GMLAN() / GMLAN_NR(requestServiceId=0xA2, returnCode=0x12) 1010 # DisableNormalCommunication 1011 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 1012 ack = b"\x68" 1013 # ReportProgrammedState 1014 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 1015 ack = GMLAN()/GMLAN_RPSPR(programmedState=0) 1016 # ProgrammingMode requestProgramming 1017 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 1018 ack = GMLAN(b"\xe5") 1019 # InitiateProgramming enableProgramming 1020 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 1021 1022thread = threading.Thread(target=ecusim) 1023sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 1024thread.start() 1025started.wait(timeout=5) 1026res = GMLAN_InitDiagnostics(isotpsock, timeout=1, retry=1, unittest=True) == True 1027thread.join(timeout=5) 1028assert res 1029 1030= first fail at ProgrammingMode requestProgramming, then sequence of the correct messages 1031started = threading.Event() 1032def ecusim(): 1033 # DisableNormalCommunication 1034 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 1035 ack = b"\x68" 1036 # ReportProgrammedState 1037 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 1038 ack = GMLAN()/GMLAN_RPSPR(programmedState=0) 1039 # Fail 1040 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 1041 ack = GMLAN() / GMLAN_NR(requestServiceId=0xA5, returnCode=0x12) 1042 # DisableNormalCommunication 1043 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 1044 ack = b"\x68" 1045 # ReportProgrammedState 1046 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 1047 ack = GMLAN()/GMLAN_RPSPR(programmedState=0) 1048 # ProgrammingMode requestProgramming 1049 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 1050 ack = GMLAN(b"\xe5") 1051 isotpsock2.send(ack) 1052 # InitiateProgramming enableProgramming 1053 requ = isotpsock2.sniff(count=1, timeout=1) 1054 1055thread = threading.Thread(target=ecusim) 1056sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 1057thread.start() 1058started.wait(timeout=5) 1059res = GMLAN_InitDiagnostics(isotpsock, timeout=1, retry=1, unittest=True) == True 1060thread.join(timeout=5) 1061assert res 1062 1063= fail twice 1064ecusimSuccessfullyExecuted = True 1065started = threading.Event() 1066def ecusim(): 1067 global ecusimSuccessfullyExecuted 1068 ecusimSuccessfullyExecuted= True 1069 requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=started.set) 1070 ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12) 1071 requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=lambda:isotpsock2.send(ack)) 1072 ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12) 1073 requ = isotpsock2.sniff(count=1, timeout=0.1, started_callback=lambda:isotpsock2.send(ack)) 1074 if len(requ) != 0: 1075 ecusimSuccessfullyExecuted = False 1076 1077thread = threading.Thread(target=ecusim) 1078sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 1079thread.start() 1080started.wait(timeout=5) 1081res = GMLAN_InitDiagnostics(isotpsock, timeout=0.1, retry=1, unittest=True) == False 1082thread.join(timeout=5) 1083assert res 1084assert ecusimSuccessfullyExecuted == True 1085 1086############################################################################## 1087+ GMLAN_ReadMemoryByAddress Tests 1088############################################################################## 1089= Positive, short length, scheme = 4 1090conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 1091payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" 1092ecusimSuccessfullyExecuted = True 1093started = threading.Event() 1094def ecusim(): 1095 global ecusimSuccessfullyExecuted 1096 ecusimSuccessfullyExecuted= True 1097 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 1098 pkt = GMLAN() / GMLAN_RMBA(memoryAddress=0x0, memorySize=0x8) 1099 if bytes(requ[0]) != bytes(pkt): 1100 ecusimSuccessfullyExecuted = False 1101 ack = GMLAN() / GMLAN_RMBAPR(memoryAddress=0x0, dataRecord=payload) 1102 isotpsock2.send(ack) 1103 1104thread = threading.Thread(target=ecusim) 1105sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 1106thread.start() 1107started.wait(timeout=5) 1108res = GMLAN_ReadMemoryByAddress(isotpsock, 0x0, 0x8, timeout=1) == payload 1109thread.join(timeout=5) 1110assert res 1111assert ecusimSuccessfullyExecuted == True 1112 1113 1114= Negative, negative response 1115conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 1116payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" 1117started = threading.Event() 1118def ecusim(): 1119 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 1120 ack = GMLAN() / GMLAN_NR(requestServiceId=0x23, returnCode=0x31) 1121 isotpsock2.send(ack) 1122 1123thread = threading.Thread(target=ecusim) 1124sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 1125thread.start() 1126started.wait(timeout=5) 1127res = GMLAN_ReadMemoryByAddress(isotpsock, 0x0, 0x8, timeout=1) is None 1128thread.join(timeout=5) 1129assert res 1130 1131= Negative, timeout 1132assert GMLAN_ReadMemoryByAddress(isotpsock, 0x0, 0x8, timeout=0.01) is None 1133 1134###### RETRY 1135= Positive, negative response, retry succeeds 1136conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 1137payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" 1138started = threading.Event() 1139def ecusim(): 1140 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) 1141 ack = GMLAN() / GMLAN_NR(requestServiceId=0x23, returnCode=0x31) 1142 requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) 1143 ack = GMLAN() / GMLAN_RMBAPR(memoryAddress=0x0, dataRecord=payload) 1144 isotpsock2.send(ack) 1145 1146thread = threading.Thread(target=ecusim) 1147sniff(timeout=0.01, opened_socket=[isotpsock, isotpsock2]) 1148thread.start() 1149started.wait(timeout=5) 1150res = GMLAN_ReadMemoryByAddress(isotpsock, 0x0, 0x8, timeout=1, retry=1) == payload 1151thread.join(timeout=5) 1152assert res 1153 1154+ Cleanup 1155 1156= Delete TestSockets 1157 1158cleanup_testsockets() 1159