1% Regression tests for EcuAnsweringMachine 2 3+ Configuration 4~ conf 5 6= Imports 7 8from test.testsocket import TestSocket, cleanup_testsockets 9 10############ 11############ 12+ Load general modules 13 14= Load contribution layer 15 16load_contrib("automotive.uds", globals_dict=globals()) 17load_contrib("automotive.ecu", globals_dict=globals()) 18load_contrib("automotive.uds_ecu_states", globals_dict=globals()) 19 20ecu = TestSocket(UDS) 21tester = TestSocket(UDS) 22ecu.pair(tester) 23 24+ Simulator tests 25 26= Simple check with RDBI and Negative Response 27 28example_responses = \ 29 [EcuResponse([EcuState(session=1)], responses=UDS() / UDS_RDBIPR(dataIdentifier=0x1234) / Raw(b"deadbeef"))] 30 31success = False 32answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS, verbose=False) 33sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff}) 34sim.start() 35try: 36 resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[0x123]), timeout=1, verbose=False) 37 assert resp.negativeResponseCode == 0x10 38 assert resp.requestServiceId == 34 39 resp = tester.sr1(UDS(service=0x22), timeout=1, verbose=False) 40 assert resp.negativeResponseCode == 0x10 41 assert resp.requestServiceId == 34 42 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[0x1234]), timeout=1, verbose=False) 43 assert resp.service == 0x62 44 assert resp.dataIdentifier == 0x1234 45 assert resp.load == b"deadbeef" 46 success = True 47except Exception as ex: 48 print(ex) 49finally: 50 tester.send(UDS(service=0xff)) 51 sim.join(timeout=10) 52 53assert success 54 55= Simple check with different Sessions 56 57example_responses = \ 58 [EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")), 59 EcuResponse(EcuState(session=[3, 4]), responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")), 60 EcuResponse(EcuState(session=[5, 6, 7]), responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")), 61 EcuResponse(EcuState(session=[8, 9]), responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4"))] 62 63success = False 64 65answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS) 66sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff}) 67sim.start() 68try: 69 resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) 70 assert resp.negativeResponseCode == 0x10 71 assert resp.requestServiceId == 34 72 answering_machine.state.session = 2 73 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) 74 assert resp.service == 0x62 75 assert resp.dataIdentifier == 2 76 assert resp.load == b"deadbeef1" 77 answering_machine.state.session = 4 78 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) 79 assert resp.service == 0x62 80 assert resp.dataIdentifier == 3 81 assert resp.load == b"deadbeef2" 82 answering_machine.state.session = 6 83 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) 84 assert resp.service == 0x62 85 assert resp.dataIdentifier == 5 86 assert resp.load == b"deadbeef3" 87 answering_machine.state.session = 9 88 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) 89 assert resp.service == 0x62 90 assert resp.dataIdentifier == 9 91 assert resp.load == b"deadbeef4" 92 success = True 93except Exception as ex: 94 print(ex) 95finally: 96 tester.send(UDS(service=0xff)) 97 sim.join(timeout=10) 98 99assert success 100 101= Simple check with different Sessions and diagnosticSessionControl 102 103example_responses = \ 104 [EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")), 105 EcuResponse(EcuState(session=range(3,5)), responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")), 106 EcuResponse(EcuState(session=[5,6,7]), responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")), 107 EcuResponse(EcuState(session=9), responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4")), 108 EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=1, sessionParameterRecord=b"dead")), 109 EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=2, sessionParameterRecord=b"dead")), 110 EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=3, sessionParameterRecord=b"dead")), 111 EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=4, sessionParameterRecord=b"dead")), 112 EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=5, sessionParameterRecord=b"dead")), 113 EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=6, sessionParameterRecord=b"dead")), 114 EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=7, sessionParameterRecord=b"dead")), 115 EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=8, sessionParameterRecord=b"dead")), 116 EcuResponse([EcuState(), EcuState(session=range(8,10))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead1")), 117 EcuResponse([EcuState(), EcuState(session=range(8,10))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead2")), 118 EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))] 119 120success = False 121 122answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS) 123sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff}) 124sim.start() 125try: 126 resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) 127 assert resp.negativeResponseCode == 0x10 128 assert resp.requestServiceId == 34 129 resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=2), timeout=1, verbose=False) 130 assert resp.service == 0x50 131 assert resp.diagnosticSessionType == 2 132 assert resp.sessionParameterRecord == b"dead" 133 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) 134 assert resp.service == 0x62 135 assert resp.dataIdentifier == 2 136 assert resp.load == b"deadbeef1" 137 resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=4), timeout=1, verbose=False) 138 assert resp.service == 0x50 139 assert resp.diagnosticSessionType == 4 140 assert resp.sessionParameterRecord == b"dead" 141 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) 142 assert resp.service == 0x62 143 assert resp.dataIdentifier == 3 144 assert resp.load == b"deadbeef2" 145 resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=6), timeout=1, verbose=False) 146 assert resp.service == 0x50 147 assert resp.diagnosticSessionType == 6 148 assert resp.sessionParameterRecord == b"dead" 149 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) 150 assert resp.service == 0x62 151 assert resp.dataIdentifier == 5 152 assert resp.load == b"deadbeef3" 153 resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=8), timeout=1, verbose=False) 154 assert resp.service == 0x50 155 assert resp.diagnosticSessionType == 8 156 assert resp.sessionParameterRecord == b"dead" 157 resp = tester.sr1(UDS() / UDS_DSC(diagnosticSessionType=9), timeout=1, verbose=False) 158 assert resp.service == 0x50 159 assert resp.diagnosticSessionType == 9 160 assert resp.sessionParameterRecord == b"dead1" 161 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) 162 assert resp.service == 0x62 163 assert resp.dataIdentifier == 9 164 assert resp.load == b"deadbeef4" 165 success = True 166except Exception as ex: 167 print(ex) 168finally: 169 tester.send(UDS(service=0xff)) 170 sim.join(timeout=10) 171 172assert success 173 174= Simple check with different Sessions and diagnosticSessionControl and answers hook 175 176def custom_answers(resp, req): 177 if req.service + 0x40 != resp.service: 178 return False 179 if hasattr(req, "diagnosticSessionType"): 180 if 0 < req.diagnosticSessionType <= 8: 181 resp.diagnosticSessionType = req.diagnosticSessionType 182 return resp.answers(req) 183 return False 184 185example_responses = \ 186 [EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")), 187 EcuResponse(EcuState(session=range(3,5)), responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")), 188 EcuResponse(EcuState(session=[5,6,7]), responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")), 189 EcuResponse(EcuState(session=[9, 10]), responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4")), 190 EcuResponse(EcuState(session=range(0,8)), responses=UDS() / UDS_DSCPR(diagnosticSessionType=1, sessionParameterRecord=b"dead"), answers=custom_answers), 191 EcuResponse(EcuState(session=range(8,10)), responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead1")), 192 EcuResponse(EcuState(session=range(8,10)), responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead2")), 193 EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))] 194 195success = False 196 197answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS) 198sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff}) 199sim.start() 200try: 201 resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) 202 assert resp.negativeResponseCode == 0x10 203 assert resp.requestServiceId == 34 204 resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=2), timeout=1, verbose=False) 205 assert resp.service == 0x50 206 assert resp.diagnosticSessionType == 2 207 assert resp.sessionParameterRecord == b"dead" 208 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) 209 assert resp.service == 0x62 210 assert resp.dataIdentifier == 2 211 assert resp.load == b"deadbeef1" 212 resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=4), timeout=1, verbose=False) 213 assert resp.service == 0x50 214 assert resp.diagnosticSessionType == 4 215 assert resp.sessionParameterRecord == b"dead" 216 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) 217 assert resp.service == 0x62 218 assert resp.dataIdentifier == 3 219 assert resp.load == b"deadbeef2" 220 resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=6), timeout=1, verbose=False) 221 assert resp.service == 0x50 222 assert resp.diagnosticSessionType == 6 223 assert resp.sessionParameterRecord == b"dead" 224 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) 225 assert resp.service == 0x62 226 assert resp.dataIdentifier == 5 227 assert resp.load == b"deadbeef3" 228 resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=8), timeout=1, verbose=False) 229 assert resp.service == 0x50 230 assert resp.diagnosticSessionType == 8 231 assert resp.sessionParameterRecord == b"dead" 232 resp = tester.sr1(UDS() / UDS_DSC(diagnosticSessionType=9), timeout=1, verbose=False) 233 assert resp.service == 0x50 234 assert resp.diagnosticSessionType == 9 235 assert resp.sessionParameterRecord == b"dead1" 236 resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) 237 assert resp.service == 0x62 238 assert resp.dataIdentifier == 9 239 assert resp.load == b"deadbeef4" 240 success = True 241except Exception as ex: 242 print(ex) 243finally: 244 tester.send(UDS(service=0xff)) 245 sim.join(timeout=10) 246 247assert success 248 249= Simple check with security access and answers hook 250 251security_seed = b"abcd" 252 253def custom_answers(resp, req): 254 global security_seed 255 if req.service + 0x40 != resp.service or req.service != 0x27: 256 return False 257 if req.securityAccessType == 1: 258 resp.securitySeed = security_seed 259 return resp.answers(req) 260 elif req.securityAccessType == 2: 261 return resp.answers(req) and req.securityKey == security_seed + security_seed 262 return False 263 264example_responses = \ 265 [EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=1, securitySeed=b"1234"), answers=custom_answers), 266 EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=2), answers=custom_answers), 267 EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x35, requestServiceId=0x27)), 268 EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))] 269 270success = False 271 272answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS) 273sim = threading.Thread(target=answering_machine, kwargs={'timeout': 10, 'stop_filter': lambda p: p.service==0xff}) 274sim.start() 275try: 276 resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False) 277 assert resp.service == 0x67 278 assert resp.securitySeed == b"abcd" 279 resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed), timeout=1, verbose=False) 280 assert resp.service == 0x7f 281 assert resp.negativeResponseCode == 0x35 282 resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False) 283 assert resp.service == 0x67 284 assert resp.securitySeed == b"abcd" 285 resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed+resp.securitySeed), timeout=1, verbose=False) 286 assert resp.service == 0x67 287 success = True 288except Exception as ex: 289 print(ex) 290finally: 291 tester.send(UDS(service=0xff)) 292 sim.join(timeout=10) 293 294assert success 295 296= Simple check with security access and answers hook and request-correctly-received message 297 298security_seed = b"abcd" 299 300def custom_answers(resp, req): 301 global security_seed 302 if req.service + 0x40 != resp.service or req.service != 0x27: 303 return False 304 if req.securityAccessType == 1: 305 resp.securitySeed = security_seed 306 return resp.answers(req) 307 elif req.securityAccessType == 2: 308 return resp.answers(req) and req.securityKey == security_seed + security_seed 309 return False 310 311example_responses = \ 312 [EcuResponse(EcuState(session=range(0,255)), responses=[UDS()/UDS_NR(negativeResponseCode=0x78, requestServiceId=0x27), UDS() / UDS_SAPR(securityAccessType=1, securitySeed=b"1234")], answers=custom_answers), 313 EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=2), answers=custom_answers), 314 EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x35, requestServiceId=0x27)), 315 EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))] 316 317success = False 318 319answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS) 320sim = threading.Thread(target=answering_machine, kwargs={'timeout': 10, 'stop_filter': lambda p: p.service==0xff}) 321sim.start() 322try: 323 resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=2, verbose=False) 324 assert resp.service == 0x67 325 assert resp.securitySeed == b"abcd" 326 resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed), timeout=2, verbose=False) 327 assert resp.service == 0x7f 328 assert resp.negativeResponseCode == 0x35 329 resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=2, verbose=False) 330 assert resp.service == 0x67 331 assert resp.securitySeed == b"abcd" 332 resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed+resp.securitySeed), timeout=2, verbose=False) 333 assert resp.service == 0x67 334 success = True 335except Exception as ex: 336 print(ex) 337finally: 338 tester.send(UDS(service=0xff)) 339 sim.join(timeout=10) 340 341assert success 342 343= Simple check with security access and answers hook and request-correctly-received message 2 344 345security_seed = b"abcd" 346 347def custom_answers(resp, req): 348 global security_seed 349 if req.service + 0x40 != resp.service or req.service != 0x27: 350 return False 351 if req.securityAccessType == 1: 352 resp.securitySeed = security_seed 353 return resp.answers(req) 354 elif req.securityAccessType == 2: 355 return resp.answers(req) and req.securityKey == security_seed + security_seed 356 return False 357 358example_responses = \ 359 [EcuResponse(EcuState(session=range(0,255)), responses=[UDS()/UDS_NR(negativeResponseCode=0x78, requestServiceId=0x27), UDS() / UDS_SAPR(securityAccessType=1, securitySeed=b"1234")], answers=custom_answers), 360 EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=2), answers=custom_answers), 361 EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x35, requestServiceId=0x27)), 362 EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))] 363 364conf.contribs['UDS']['treat-response-pending-as-answer'] = True 365 366success = False 367 368answering_machine = EcuAnsweringMachine(supported_responses=example_responses, 369 main_socket=ecu, basecls=UDS) 370sim = threading.Thread(target=answering_machine, kwargs={'timeout':5, 'stop_filter': lambda p: p.service==0xff}) 371sim.start() 372try: 373 resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False) 374 assert resp.service == 0x7f 375 assert resp.negativeResponseCode == 0x78 376 resp = tester.sniff(timeout=2, count=1, verbose=False)[0] 377 assert resp.service == 0x67 378 assert resp.securitySeed == b"abcd" 379 resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed), timeout=3, verbose=False) 380 assert resp.service == 0x7f 381 assert resp.negativeResponseCode == 0x35 382 resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False) 383 assert resp.service == 0x7f 384 assert resp.negativeResponseCode == 0x78 385 resp = tester.sniff(timeout=2, count=1, verbose=False)[0] 386 assert resp.service == 0x67 387 assert resp.securitySeed == b"abcd" 388 resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed+resp.securitySeed), timeout=1, verbose=False) 389 assert resp.service == 0x67 390 success = True 391except Exception as ex: 392 print(ex) 393finally: 394 tester.send(UDS(service=0xff)) 395 sim.join(timeout=10) 396 397assert success 398 399conf.contribs['UDS']['treat-response-pending-as-answer'] = False 400 401+ Cleanup 402 403= Delete TestSockets 404 405cleanup_testsockets()