1import logging 2import subprocess 3from datetime import datetime 4 5import pytest 6from custom_logger import CustomLogger 7 8 9@pytest.fixture 10def tmp_log_file(tmp_path): 11 return tmp_path / "test_log.json" 12 13 14@pytest.fixture 15def custom_logger(tmp_log_file): 16 return CustomLogger(tmp_log_file) 17 18 19def run_script_with_args(args): 20 import custom_logger 21 22 script_path = custom_logger.__file__ 23 return subprocess.run( 24 ["python3", str(script_path), *args], capture_output=True, text=True 25 ) 26 27 28# Test case for missing log file 29@pytest.mark.parametrize( 30 "key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")] 31) 32def test_missing_log_file_argument(key, value): 33 result = run_script_with_args(["--update", "key", "value"]) 34 assert result.returncode != 0 35 36 37# Parametrize test case for valid update arguments 38@pytest.mark.parametrize( 39 "key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")] 40) 41def test_update_argument_valid(custom_logger, tmp_log_file, key, value): 42 result = run_script_with_args([str(tmp_log_file), "--update", key, value]) 43 assert result.returncode == 0 44 45 46# Test case for passing only the key without a value 47def test_update_argument_key_only(custom_logger, tmp_log_file): 48 key = "dut_attempt_counter" 49 result = run_script_with_args([str(tmp_log_file), "--update", key]) 50 assert result.returncode != 0 51 52 53# Test case for not passing any key-value pair 54def test_update_argument_no_values(custom_logger, tmp_log_file): 55 result = run_script_with_args([str(tmp_log_file), "--update"]) 56 assert result.returncode == 0 57 58 59# Parametrize test case for valid arguments 60@pytest.mark.parametrize( 61 "key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")] 62) 63def test_create_argument_valid(custom_logger, tmp_log_file, key, value): 64 result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key, value]) 65 assert result.returncode == 0 66 67 68# Test case for passing only the key without a value 69def test_create_argument_key_only(custom_logger, tmp_log_file): 70 key = "dut_attempt_counter" 71 result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key]) 72 assert result.returncode != 0 73 74 75# Test case for not passing any key-value pair 76def test_create_argument_no_values(custom_logger, tmp_log_file): 77 result = run_script_with_args([str(tmp_log_file), "--create-dut-job"]) 78 assert result.returncode == 0 79 80 81# Test case for updating a DUT job 82@pytest.mark.parametrize( 83 "key, value", [("status", "hung"), ("dut_state", "Canceling"), ("dut_name", "asus")] 84) 85def test_update_dut_job(custom_logger, tmp_log_file, key, value): 86 result = run_script_with_args([str(tmp_log_file), "--update-dut-job", key, value]) 87 assert result.returncode != 0 88 89 result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key, value]) 90 assert result.returncode == 0 91 92 result = run_script_with_args([str(tmp_log_file), "--update-dut-job", key, value]) 93 assert result.returncode == 0 94 95 96# Test case for updating last DUT job 97def test_update_dut_multiple_job(custom_logger, tmp_log_file): 98 # Create the first DUT job with the first key 99 result = run_script_with_args( 100 [str(tmp_log_file), "--create-dut-job", "status", "hung"] 101 ) 102 assert result.returncode == 0 103 104 # Create the second DUT job with the second key 105 result = run_script_with_args( 106 [str(tmp_log_file), "--create-dut-job", "dut_state", "Canceling"] 107 ) 108 assert result.returncode == 0 109 110 result = run_script_with_args( 111 [str(tmp_log_file), "--update-dut-job", "dut_name", "asus"] 112 ) 113 assert result.returncode == 0 114 115 116# Parametrize test case for valid phase arguments 117@pytest.mark.parametrize( 118 "phase_name", 119 [("Phase1"), ("Phase2"), ("Phase3")], 120) 121def test_create_job_phase_valid(custom_logger, tmp_log_file, phase_name): 122 custom_logger.create_dut_job(status="pass") 123 124 result = run_script_with_args([str(tmp_log_file), "--create-job-phase", phase_name]) 125 assert result.returncode == 0 126 127 128# Test case for not passing any arguments for create-job-phase 129def test_create_job_phase_no_arguments(custom_logger, tmp_log_file): 130 custom_logger.create_dut_job(status="pass") 131 132 result = run_script_with_args([str(tmp_log_file), "--create-job-phase"]) 133 assert result.returncode != 0 134 135 136# Test case for trying to create a phase job without an existing DUT job 137def test_create_job_phase_no_dut_job(custom_logger, tmp_log_file): 138 phase_name = "Phase1" 139 140 result = run_script_with_args([str(tmp_log_file), "--create-job-phase", phase_name]) 141 assert result.returncode != 0 142 143 144# Combined test cases for valid scenarios 145def test_valid_scenarios(custom_logger, tmp_log_file): 146 valid_update_args = [("dut_attempt_counter", "1"), ("job_combined_status", "pass")] 147 for key, value in valid_update_args: 148 result = run_script_with_args([str(tmp_log_file), "--update", key, value]) 149 assert result.returncode == 0 150 151 valid_create_args = [ 152 ("status", "hung"), 153 ("dut_state", "Canceling"), 154 ("dut_name", "asus"), 155 ("phase_name", "Bootloader"), 156 ] 157 for key, value in valid_create_args: 158 result = run_script_with_args( 159 [str(tmp_log_file), "--create-dut-job", key, value] 160 ) 161 assert result.returncode == 0 162 163 result = run_script_with_args( 164 [str(tmp_log_file), "--create-dut-job", "status", "hung"] 165 ) 166 assert result.returncode == 0 167 168 result = run_script_with_args( 169 [str(tmp_log_file), "--update-dut-job", "dut_name", "asus"] 170 ) 171 assert result.returncode == 0 172 173 result = run_script_with_args( 174 [ 175 str(tmp_log_file), 176 "--create-job-phase", 177 "phase_name", 178 ] 179 ) 180 assert result.returncode == 0 181 182 183# Parametrize test case for valid update arguments 184@pytest.mark.parametrize( 185 "key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")] 186) 187def test_update(custom_logger, key, value): 188 custom_logger.update(**{key: value}) 189 logger_data = custom_logger.logger.data 190 191 assert key in logger_data 192 assert logger_data[key] == value 193 194 195# Test case for updating with a key that already exists 196def test_update_existing_key(custom_logger): 197 key = "status" 198 value = "new_value" 199 custom_logger.logger.data[key] = "old_value" 200 custom_logger.update(**{key: value}) 201 logger_data = custom_logger.logger.data 202 203 assert key in logger_data 204 assert logger_data[key] == value 205 206 207# Test case for updating "dut_jobs" 208def test_update_dut_jobs(custom_logger): 209 key1 = "status" 210 value1 = "fail" 211 key2 = "state" 212 value2 = "hung" 213 214 custom_logger.create_dut_job(**{key1: value1}) 215 logger_data = custom_logger.logger.data 216 217 job1 = logger_data["dut_jobs"][0] 218 assert key1 in job1 219 assert job1[key1] == value1 220 221 custom_logger.update_dut_job(key2, value2) 222 logger_data = custom_logger.logger.data 223 224 job2 = logger_data["dut_jobs"][0] 225 assert key2 in job2 226 assert job2[key2] == value2 227 228 229# Test case for creating and updating DUT job 230def test_create_dut_job(custom_logger): 231 key = "status" 232 value1 = "pass" 233 value2 = "fail" 234 value3 = "hung" 235 236 reason = "job_combined_status" 237 result = "Finished" 238 239 custom_logger.update(**{reason: result}) 240 logger_data = custom_logger.logger.data 241 242 assert reason in logger_data 243 assert logger_data[reason] == result 244 245 # Create the first DUT job 246 custom_logger.create_dut_job(**{key: value1}) 247 logger_data = custom_logger.logger.data 248 249 assert "dut_jobs" in logger_data 250 assert isinstance(logger_data["dut_jobs"], list) 251 assert len(logger_data["dut_jobs"]) == 1 252 assert isinstance(logger_data["dut_jobs"][0], dict) 253 254 # Check the values of the keys in the created first DUT job 255 job1 = logger_data["dut_jobs"][0] 256 assert key in job1 257 assert job1[key] == value1 258 259 # Create the second DUT job 260 custom_logger.create_dut_job(**{key: value2}) 261 logger_data = custom_logger.logger.data 262 263 assert "dut_jobs" in logger_data 264 assert isinstance(logger_data["dut_jobs"], list) 265 assert len(logger_data["dut_jobs"]) == 2 266 assert isinstance(logger_data["dut_jobs"][1], dict) 267 268 # Check the values of the keys in the created second DUT job 269 job2 = logger_data["dut_jobs"][1] 270 assert key in job2 271 assert job2[key] == value2 272 273 # Update the second DUT job with value3 274 custom_logger.update_dut_job(key, value3) 275 logger_data = custom_logger.logger.data 276 277 # Check the updated value in the second DUT job 278 job2 = logger_data["dut_jobs"][1] 279 assert key in job2 280 assert job2[key] == value3 281 282 # Find the index of the last DUT job 283 last_job_index = len(logger_data["dut_jobs"]) - 1 284 285 # Update the last DUT job 286 custom_logger.update_dut_job("dut_name", "asus") 287 logger_data = custom_logger.logger.data 288 289 # Check the updated value in the last DUT job 290 job2 = logger_data["dut_jobs"][last_job_index] 291 assert "dut_name" in job2 292 assert job2["dut_name"] == "asus" 293 294 # Check that "dut_name" is not present in other DUT jobs 295 for idx, job in enumerate(logger_data["dut_jobs"]): 296 if idx != last_job_index: 297 assert job.get("dut_name") == "" 298 299 300# Test case for updating with missing "dut_jobs" key 301def test_update_dut_job_missing_dut_jobs(custom_logger): 302 key = "status" 303 value = "fail" 304 305 # Attempt to update a DUT job when "dut_jobs" is missing 306 with pytest.raises(ValueError, match="No DUT jobs found."): 307 custom_logger.update_dut_job(key, value) 308 309 310# Test case for creating a job phase 311def test_create_job_phase(custom_logger): 312 custom_logger.create_dut_job(status="pass") 313 phase_name = "Phase1" 314 315 custom_logger.create_job_phase(phase_name) 316 logger_data = custom_logger.logger.data 317 318 assert "dut_jobs" in logger_data 319 assert isinstance(logger_data["dut_jobs"], list) 320 assert len(logger_data["dut_jobs"]) == 1 321 322 job = logger_data["dut_jobs"][0] 323 assert "dut_job_phases" in job 324 assert isinstance(job["dut_job_phases"], list) 325 assert len(job["dut_job_phases"]) == 1 326 327 phase = job["dut_job_phases"][0] 328 assert phase["name"] == phase_name 329 try: 330 datetime.fromisoformat(phase["start_time"]) 331 assert True 332 except ValueError: 333 assert False 334 assert phase["end_time"] == "" 335 336 337# Test case for creating multiple phase jobs 338def test_create_multiple_phase_jobs(custom_logger): 339 custom_logger.create_dut_job(status="pass") 340 341 phase_data = [ 342 { 343 "phase_name": "Phase1", 344 }, 345 { 346 "phase_name": "Phase2", 347 }, 348 { 349 "phase_name": "Phase3", 350 }, 351 ] 352 353 for data in phase_data: 354 phase_name = data["phase_name"] 355 356 custom_logger.create_job_phase(phase_name) 357 358 logger_data = custom_logger.logger.data 359 360 assert "dut_jobs" in logger_data 361 assert isinstance(logger_data["dut_jobs"], list) 362 assert len(logger_data["dut_jobs"]) == 1 363 364 job = logger_data["dut_jobs"][0] 365 assert "dut_job_phases" in job 366 assert isinstance(job["dut_job_phases"], list) 367 assert len(job["dut_job_phases"]) == len(phase_data) 368 369 for data in phase_data: 370 phase_name = data["phase_name"] 371 372 phase = job["dut_job_phases"][phase_data.index(data)] 373 374 assert phase["name"] == phase_name 375 try: 376 datetime.fromisoformat(phase["start_time"]) 377 assert True 378 except ValueError: 379 assert False 380 381 if phase_data.index(data) != len(phase_data) - 1: 382 try: 383 datetime.fromisoformat(phase["end_time"]) 384 assert True 385 except ValueError: 386 assert False 387 388 # Check if the end_time of the last phase is an empty string 389 last_phase = job["dut_job_phases"][-1] 390 assert last_phase["end_time"] == "" 391 392 393# Test case for creating multiple dut jobs and updating phase job for last dut job 394def test_create_two_dut_jobs_and_add_phase(custom_logger): 395 # Create the first DUT job 396 custom_logger.create_dut_job(status="pass") 397 398 # Create the second DUT job 399 custom_logger.create_dut_job(status="fail") 400 401 logger_data = custom_logger.logger.data 402 403 assert "dut_jobs" in logger_data 404 assert isinstance(logger_data["dut_jobs"], list) 405 assert len(logger_data["dut_jobs"]) == 2 406 407 first_dut_job = logger_data["dut_jobs"][0] 408 second_dut_job = logger_data["dut_jobs"][1] 409 410 # Add a phase to the second DUT job 411 custom_logger.create_job_phase("Phase1") 412 413 logger_data = custom_logger.logger.data 414 415 assert "dut_jobs" in logger_data 416 assert isinstance(logger_data["dut_jobs"], list) 417 assert len(logger_data["dut_jobs"]) == 2 418 419 first_dut_job = logger_data["dut_jobs"][0] 420 second_dut_job = logger_data["dut_jobs"][1] 421 422 # Check first DUT job does not have a phase 423 assert not first_dut_job.get("dut_job_phases") 424 425 # Check second DUT job has a phase 426 assert second_dut_job.get("dut_job_phases") 427 assert isinstance(second_dut_job["dut_job_phases"], list) 428 assert len(second_dut_job["dut_job_phases"]) == 1 429 430 431# Test case for updating DUT start time 432def test_update_dut_start_time(custom_logger): 433 custom_logger.create_dut_job(status="pass") 434 custom_logger.update_dut_time("start", None) 435 436 logger_data = custom_logger.logger.data 437 assert "dut_jobs" in logger_data 438 assert len(logger_data["dut_jobs"]) == 1 439 440 dut_job = logger_data["dut_jobs"][0] 441 assert "dut_start_time" in dut_job 442 assert dut_job["dut_start_time"] != "" 443 444 try: 445 datetime.fromisoformat(dut_job["dut_start_time"]) 446 assert True 447 except ValueError: 448 assert False 449 450 451# Test case for updating DUT submit time 452def test_update_dut_submit_time(custom_logger): 453 custom_time = "2023-11-09T02:37:06Z" 454 custom_logger.create_dut_job(status="pass") 455 custom_logger.update_dut_time("submit", custom_time) 456 457 logger_data = custom_logger.logger.data 458 assert "dut_jobs" in logger_data 459 assert len(logger_data["dut_jobs"]) == 1 460 461 dut_job = logger_data["dut_jobs"][0] 462 assert "dut_submit_time" in dut_job 463 464 try: 465 datetime.fromisoformat(dut_job["dut_submit_time"]) 466 assert True 467 except ValueError: 468 assert False 469 470 471# Test case for updating DUT end time 472def test_update_dut_end_time(custom_logger): 473 custom_logger.create_dut_job(status="pass") 474 custom_logger.update_dut_time("end", None) 475 476 logger_data = custom_logger.logger.data 477 assert "dut_jobs" in logger_data 478 assert len(logger_data["dut_jobs"]) == 1 479 480 dut_job = logger_data["dut_jobs"][0] 481 assert "dut_end_time" in dut_job 482 483 try: 484 datetime.fromisoformat(dut_job["dut_end_time"]) 485 assert True 486 except ValueError: 487 assert False 488 489 490# Test case for updating DUT time with invalid value 491def test_update_dut_time_invalid_value(custom_logger): 492 custom_logger.create_dut_job(status="pass") 493 with pytest.raises( 494 ValueError, 495 match="Error: Invalid argument provided for --update-dut-time. Use 'start', 'submit', 'end'.", 496 ): 497 custom_logger.update_dut_time("invalid_value", None) 498 499 500# Test case for close_dut_job 501def test_close_dut_job(custom_logger): 502 custom_logger.create_dut_job(status="pass") 503 504 custom_logger.create_job_phase("Phase1") 505 custom_logger.create_job_phase("Phase2") 506 507 custom_logger.close_dut_job() 508 509 logger_data = custom_logger.logger.data 510 assert "dut_jobs" in logger_data 511 assert len(logger_data["dut_jobs"]) == 1 512 513 dut_job = logger_data["dut_jobs"][0] 514 assert "dut_job_phases" in dut_job 515 dut_job_phases = dut_job["dut_job_phases"] 516 517 phase1 = dut_job_phases[0] 518 assert phase1["name"] == "Phase1" 519 520 try: 521 datetime.fromisoformat(phase1["start_time"]) 522 assert True 523 except ValueError: 524 assert False 525 526 try: 527 datetime.fromisoformat(phase1["end_time"]) 528 assert True 529 except ValueError: 530 assert False 531 532 phase2 = dut_job_phases[1] 533 assert phase2["name"] == "Phase2" 534 535 try: 536 datetime.fromisoformat(phase2["start_time"]) 537 assert True 538 except ValueError: 539 assert False 540 541 try: 542 datetime.fromisoformat(phase2["end_time"]) 543 assert True 544 except ValueError: 545 assert False 546 547 548# Test case for close 549def test_close(custom_logger): 550 custom_logger.create_dut_job(status="pass") 551 552 custom_logger.close() 553 554 logger_data = custom_logger.logger.data 555 assert "dut_jobs" in logger_data 556 assert len(logger_data["dut_jobs"]) == 1 557 assert "dut_attempt_counter" in logger_data 558 assert logger_data["dut_attempt_counter"] == len(logger_data["dut_jobs"]) 559 assert "job_combined_status" in logger_data 560 assert logger_data["job_combined_status"] != "" 561 562 dut_job = logger_data["dut_jobs"][0] 563 assert "submitter_end_time" in dut_job 564 try: 565 datetime.fromisoformat(dut_job["submitter_end_time"]) 566 assert True 567 except ValueError: 568 assert False 569 570 571# Test case for updating status to fail with a reason 572def test_update_status_fail_with_reason(custom_logger): 573 custom_logger.create_dut_job() 574 575 reason = "kernel panic" 576 custom_logger.update_status_fail(reason) 577 578 logger_data = custom_logger.logger.data 579 assert "dut_jobs" in logger_data 580 assert len(logger_data["dut_jobs"]) == 1 581 582 dut_job = logger_data["dut_jobs"][0] 583 assert "status" in dut_job 584 assert dut_job["status"] == "fail" 585 assert "dut_job_fail_reason" in dut_job 586 assert dut_job["dut_job_fail_reason"] == reason 587 588 589# Test case for updating status to fail without providing a reason 590def test_update_status_fail_without_reason(custom_logger): 591 custom_logger.create_dut_job() 592 593 custom_logger.update_status_fail() 594 595 # Check if the status is updated and fail reason is empty 596 logger_data = custom_logger.logger.data 597 assert "dut_jobs" in logger_data 598 assert len(logger_data["dut_jobs"]) == 1 599 600 dut_job = logger_data["dut_jobs"][0] 601 assert "status" in dut_job 602 assert dut_job["status"] == "fail" 603 assert "dut_job_fail_reason" in dut_job 604 assert dut_job["dut_job_fail_reason"] == "" 605 606 607# Test case for check_dut_timings with submission time earlier than start time 608def test_check_dut_timings_submission_earlier_than_start(custom_logger, caplog): 609 custom_logger.create_dut_job() 610 611 # Set submission time to be earlier than start time 612 custom_logger.update_dut_time("start", "2023-01-01T11:00:00") 613 custom_logger.update_dut_time("submit", "2023-01-01T12:00:00") 614 615 logger_data = custom_logger.logger.data 616 assert "dut_jobs" in logger_data 617 assert len(logger_data["dut_jobs"]) == 1 618 619 job = logger_data["dut_jobs"][0] 620 621 # Call check_dut_timings 622 custom_logger.check_dut_timings(job) 623 624 # Check if an error message is logged 625 assert "Job submission is happening before job start." in caplog.text 626 627 628# Test case for check_dut_timings with end time earlier than start time 629def test_check_dut_timings_end_earlier_than_start(custom_logger, caplog): 630 custom_logger.create_dut_job() 631 632 # Set end time to be earlier than start time 633 custom_logger.update_dut_time("end", "2023-01-01T11:00:00") 634 custom_logger.update_dut_time("start", "2023-01-01T12:00:00") 635 636 logger_data = custom_logger.logger.data 637 assert "dut_jobs" in logger_data 638 assert len(logger_data["dut_jobs"]) == 1 639 640 job = logger_data["dut_jobs"][0] 641 642 # Call check_dut_timings 643 custom_logger.check_dut_timings(job) 644 645 # Check if an error message is logged 646 assert "Job ended before it started." in caplog.text 647 648 649# Test case for check_dut_timings with valid timing sequence 650def test_check_dut_timings_valid_timing_sequence(custom_logger, caplog): 651 custom_logger.create_dut_job() 652 653 # Set valid timing sequence 654 custom_logger.update_dut_time("submit", "2023-01-01T12:00:00") 655 custom_logger.update_dut_time("start", "2023-01-01T12:30:00") 656 custom_logger.update_dut_time("end", "2023-01-01T13:00:00") 657 658 logger_data = custom_logger.logger.data 659 assert "dut_jobs" in logger_data 660 assert len(logger_data["dut_jobs"]) == 1 661 662 job = logger_data["dut_jobs"][0] 663 664 # Call check_dut_timings 665 custom_logger.check_dut_timings(job) 666 667 # Check that no error messages are logged 668 assert "Job submission is happening before job start." not in caplog.text 669 assert "Job ended before it started." not in caplog.text 670