• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import logging
2import subprocess
3from datetime import datetime
4
5import pytest
6from custom_logger import CustomLogger
7
8
9@pytest.fixture
10def tmp_log_file(tmp_path):
11    return tmp_path / "test_log.json"
12
13
14@pytest.fixture
15def custom_logger(tmp_log_file):
16    return CustomLogger(tmp_log_file)
17
18
19def run_script_with_args(args):
20    import custom_logger
21
22    script_path = custom_logger.__file__
23    return subprocess.run(
24        ["python3", str(script_path), *args], capture_output=True, text=True
25    )
26
27
28# Test case for missing log file
29@pytest.mark.parametrize(
30    "key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
31)
32def test_missing_log_file_argument(key, value):
33    result = run_script_with_args(["--update", "key", "value"])
34    assert result.returncode != 0
35
36
37# Parametrize test case for valid update arguments
38@pytest.mark.parametrize(
39    "key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
40)
41def test_update_argument_valid(custom_logger, tmp_log_file, key, value):
42    result = run_script_with_args([str(tmp_log_file), "--update", key, value])
43    assert result.returncode == 0
44
45
46# Test case for passing only the key without a value
47def test_update_argument_key_only(custom_logger, tmp_log_file):
48    key = "dut_attempt_counter"
49    result = run_script_with_args([str(tmp_log_file), "--update", key])
50    assert result.returncode != 0
51
52
53# Test case for not passing any key-value pair
54def test_update_argument_no_values(custom_logger, tmp_log_file):
55    result = run_script_with_args([str(tmp_log_file), "--update"])
56    assert result.returncode == 0
57
58
59# Parametrize test case for valid arguments
60@pytest.mark.parametrize(
61    "key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
62)
63def test_create_argument_valid(custom_logger, tmp_log_file, key, value):
64    result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key, value])
65    assert result.returncode == 0
66
67
68# Test case for passing only the key without a value
69def test_create_argument_key_only(custom_logger, tmp_log_file):
70    key = "dut_attempt_counter"
71    result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key])
72    assert result.returncode != 0
73
74
75# Test case for not passing any key-value pair
76def test_create_argument_no_values(custom_logger, tmp_log_file):
77    result = run_script_with_args([str(tmp_log_file), "--create-dut-job"])
78    assert result.returncode == 0
79
80
81# Test case for updating a DUT job
82@pytest.mark.parametrize(
83    "key, value", [("status", "hung"), ("dut_state", "Canceling"), ("dut_name", "asus")]
84)
85def test_update_dut_job(custom_logger, tmp_log_file, key, value):
86    result = run_script_with_args([str(tmp_log_file), "--update-dut-job", key, value])
87    assert result.returncode != 0
88
89    result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key, value])
90    assert result.returncode == 0
91
92    result = run_script_with_args([str(tmp_log_file), "--update-dut-job", key, value])
93    assert result.returncode == 0
94
95
96# Test case for updating last DUT job
97def test_update_dut_multiple_job(custom_logger, tmp_log_file):
98    # Create the first DUT job with the first key
99    result = run_script_with_args(
100        [str(tmp_log_file), "--create-dut-job", "status", "hung"]
101    )
102    assert result.returncode == 0
103
104    # Create the second DUT job with the second key
105    result = run_script_with_args(
106        [str(tmp_log_file), "--create-dut-job", "dut_state", "Canceling"]
107    )
108    assert result.returncode == 0
109
110    result = run_script_with_args(
111        [str(tmp_log_file), "--update-dut-job", "dut_name", "asus"]
112    )
113    assert result.returncode == 0
114
115
116# Parametrize test case for valid phase arguments
117@pytest.mark.parametrize(
118    "phase_name",
119    [("Phase1"), ("Phase2"), ("Phase3")],
120)
121def test_create_job_phase_valid(custom_logger, tmp_log_file, phase_name):
122    custom_logger.create_dut_job(status="pass")
123
124    result = run_script_with_args([str(tmp_log_file), "--create-job-phase", phase_name])
125    assert result.returncode == 0
126
127
128# Test case for not passing any arguments for create-job-phase
129def test_create_job_phase_no_arguments(custom_logger, tmp_log_file):
130    custom_logger.create_dut_job(status="pass")
131
132    result = run_script_with_args([str(tmp_log_file), "--create-job-phase"])
133    assert result.returncode != 0
134
135
136# Test case for trying to create a phase job without an existing DUT job
137def test_create_job_phase_no_dut_job(custom_logger, tmp_log_file):
138    phase_name = "Phase1"
139
140    result = run_script_with_args([str(tmp_log_file), "--create-job-phase", phase_name])
141    assert result.returncode != 0
142
143
144# Combined test cases for valid scenarios
145def test_valid_scenarios(custom_logger, tmp_log_file):
146    valid_update_args = [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
147    for key, value in valid_update_args:
148        result = run_script_with_args([str(tmp_log_file), "--update", key, value])
149        assert result.returncode == 0
150
151    valid_create_args = [
152        ("status", "hung"),
153        ("dut_state", "Canceling"),
154        ("dut_name", "asus"),
155        ("phase_name", "Bootloader"),
156    ]
157    for key, value in valid_create_args:
158        result = run_script_with_args(
159            [str(tmp_log_file), "--create-dut-job", key, value]
160        )
161        assert result.returncode == 0
162
163    result = run_script_with_args(
164        [str(tmp_log_file), "--create-dut-job", "status", "hung"]
165    )
166    assert result.returncode == 0
167
168    result = run_script_with_args(
169        [str(tmp_log_file), "--update-dut-job", "dut_name", "asus"]
170    )
171    assert result.returncode == 0
172
173    result = run_script_with_args(
174        [
175            str(tmp_log_file),
176            "--create-job-phase",
177            "phase_name",
178        ]
179    )
180    assert result.returncode == 0
181
182
183# Parametrize test case for valid update arguments
184@pytest.mark.parametrize(
185    "key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
186)
187def test_update(custom_logger, key, value):
188    custom_logger.update(**{key: value})
189    logger_data = custom_logger.logger.data
190
191    assert key in logger_data
192    assert logger_data[key] == value
193
194
195# Test case for updating with a key that already exists
196def test_update_existing_key(custom_logger):
197    key = "status"
198    value = "new_value"
199    custom_logger.logger.data[key] = "old_value"
200    custom_logger.update(**{key: value})
201    logger_data = custom_logger.logger.data
202
203    assert key in logger_data
204    assert logger_data[key] == value
205
206
207# Test case for updating "dut_jobs"
208def test_update_dut_jobs(custom_logger):
209    key1 = "status"
210    value1 = "fail"
211    key2 = "state"
212    value2 = "hung"
213
214    custom_logger.create_dut_job(**{key1: value1})
215    logger_data = custom_logger.logger.data
216
217    job1 = logger_data["dut_jobs"][0]
218    assert key1 in job1
219    assert job1[key1] == value1
220
221    custom_logger.update_dut_job(key2, value2)
222    logger_data = custom_logger.logger.data
223
224    job2 = logger_data["dut_jobs"][0]
225    assert key2 in job2
226    assert job2[key2] == value2
227
228
229# Test case for creating and updating DUT job
230def test_create_dut_job(custom_logger):
231    key = "status"
232    value1 = "pass"
233    value2 = "fail"
234    value3 = "hung"
235
236    reason = "job_combined_status"
237    result = "Finished"
238
239    custom_logger.update(**{reason: result})
240    logger_data = custom_logger.logger.data
241
242    assert reason in logger_data
243    assert logger_data[reason] == result
244
245    # Create the first DUT job
246    custom_logger.create_dut_job(**{key: value1})
247    logger_data = custom_logger.logger.data
248
249    assert "dut_jobs" in logger_data
250    assert isinstance(logger_data["dut_jobs"], list)
251    assert len(logger_data["dut_jobs"]) == 1
252    assert isinstance(logger_data["dut_jobs"][0], dict)
253
254    # Check the values of the keys in the created first DUT job
255    job1 = logger_data["dut_jobs"][0]
256    assert key in job1
257    assert job1[key] == value1
258
259    # Create the second DUT job
260    custom_logger.create_dut_job(**{key: value2})
261    logger_data = custom_logger.logger.data
262
263    assert "dut_jobs" in logger_data
264    assert isinstance(logger_data["dut_jobs"], list)
265    assert len(logger_data["dut_jobs"]) == 2
266    assert isinstance(logger_data["dut_jobs"][1], dict)
267
268    # Check the values of the keys in the created second DUT job
269    job2 = logger_data["dut_jobs"][1]
270    assert key in job2
271    assert job2[key] == value2
272
273    # Update the second DUT job with value3
274    custom_logger.update_dut_job(key, value3)
275    logger_data = custom_logger.logger.data
276
277    # Check the updated value in the second DUT job
278    job2 = logger_data["dut_jobs"][1]
279    assert key in job2
280    assert job2[key] == value3
281
282    # Find the index of the last DUT job
283    last_job_index = len(logger_data["dut_jobs"]) - 1
284
285    # Update the last DUT job
286    custom_logger.update_dut_job("dut_name", "asus")
287    logger_data = custom_logger.logger.data
288
289    # Check the updated value in the last DUT job
290    job2 = logger_data["dut_jobs"][last_job_index]
291    assert "dut_name" in job2
292    assert job2["dut_name"] == "asus"
293
294    # Check that "dut_name" is not present in other DUT jobs
295    for idx, job in enumerate(logger_data["dut_jobs"]):
296        if idx != last_job_index:
297            assert job.get("dut_name") == ""
298
299
300# Test case for updating with missing "dut_jobs" key
301def test_update_dut_job_missing_dut_jobs(custom_logger):
302    key = "status"
303    value = "fail"
304
305    # Attempt to update a DUT job when "dut_jobs" is missing
306    with pytest.raises(ValueError, match="No DUT jobs found."):
307        custom_logger.update_dut_job(key, value)
308
309
310# Test case for creating a job phase
311def test_create_job_phase(custom_logger):
312    custom_logger.create_dut_job(status="pass")
313    phase_name = "Phase1"
314
315    custom_logger.create_job_phase(phase_name)
316    logger_data = custom_logger.logger.data
317
318    assert "dut_jobs" in logger_data
319    assert isinstance(logger_data["dut_jobs"], list)
320    assert len(logger_data["dut_jobs"]) == 1
321
322    job = logger_data["dut_jobs"][0]
323    assert "dut_job_phases" in job
324    assert isinstance(job["dut_job_phases"], list)
325    assert len(job["dut_job_phases"]) == 1
326
327    phase = job["dut_job_phases"][0]
328    assert phase["name"] == phase_name
329    try:
330        datetime.fromisoformat(phase["start_time"])
331        assert True
332    except ValueError:
333        assert False
334    assert phase["end_time"] == ""
335
336
337# Test case for creating multiple phase jobs
338def test_create_multiple_phase_jobs(custom_logger):
339    custom_logger.create_dut_job(status="pass")
340
341    phase_data = [
342        {
343            "phase_name": "Phase1",
344        },
345        {
346            "phase_name": "Phase2",
347        },
348        {
349            "phase_name": "Phase3",
350        },
351    ]
352
353    for data in phase_data:
354        phase_name = data["phase_name"]
355
356        custom_logger.create_job_phase(phase_name)
357
358    logger_data = custom_logger.logger.data
359
360    assert "dut_jobs" in logger_data
361    assert isinstance(logger_data["dut_jobs"], list)
362    assert len(logger_data["dut_jobs"]) == 1
363
364    job = logger_data["dut_jobs"][0]
365    assert "dut_job_phases" in job
366    assert isinstance(job["dut_job_phases"], list)
367    assert len(job["dut_job_phases"]) == len(phase_data)
368
369    for data in phase_data:
370        phase_name = data["phase_name"]
371
372        phase = job["dut_job_phases"][phase_data.index(data)]
373
374        assert phase["name"] == phase_name
375        try:
376            datetime.fromisoformat(phase["start_time"])
377            assert True
378        except ValueError:
379            assert False
380
381        if phase_data.index(data) != len(phase_data) - 1:
382            try:
383                datetime.fromisoformat(phase["end_time"])
384                assert True
385            except ValueError:
386                assert False
387
388    # Check if the end_time of the last phase is an empty string
389    last_phase = job["dut_job_phases"][-1]
390    assert last_phase["end_time"] == ""
391
392
393# Test case for creating multiple dut jobs and updating phase job for last dut job
394def test_create_two_dut_jobs_and_add_phase(custom_logger):
395    # Create the first DUT job
396    custom_logger.create_dut_job(status="pass")
397
398    # Create the second DUT job
399    custom_logger.create_dut_job(status="fail")
400
401    logger_data = custom_logger.logger.data
402
403    assert "dut_jobs" in logger_data
404    assert isinstance(logger_data["dut_jobs"], list)
405    assert len(logger_data["dut_jobs"]) == 2
406
407    first_dut_job = logger_data["dut_jobs"][0]
408    second_dut_job = logger_data["dut_jobs"][1]
409
410    # Add a phase to the second DUT job
411    custom_logger.create_job_phase("Phase1")
412
413    logger_data = custom_logger.logger.data
414
415    assert "dut_jobs" in logger_data
416    assert isinstance(logger_data["dut_jobs"], list)
417    assert len(logger_data["dut_jobs"]) == 2
418
419    first_dut_job = logger_data["dut_jobs"][0]
420    second_dut_job = logger_data["dut_jobs"][1]
421
422    # Check first DUT job does not have a phase
423    assert not first_dut_job.get("dut_job_phases")
424
425    # Check second DUT job has a phase
426    assert second_dut_job.get("dut_job_phases")
427    assert isinstance(second_dut_job["dut_job_phases"], list)
428    assert len(second_dut_job["dut_job_phases"]) == 1
429
430
431# Test case for updating DUT start time
432def test_update_dut_start_time(custom_logger):
433    custom_logger.create_dut_job(status="pass")
434    custom_logger.update_dut_time("start", None)
435
436    logger_data = custom_logger.logger.data
437    assert "dut_jobs" in logger_data
438    assert len(logger_data["dut_jobs"]) == 1
439
440    dut_job = logger_data["dut_jobs"][0]
441    assert "dut_start_time" in dut_job
442    assert dut_job["dut_start_time"] != ""
443
444    try:
445        datetime.fromisoformat(dut_job["dut_start_time"])
446        assert True
447    except ValueError:
448        assert False
449
450
451# Test case for updating DUT submit time
452def test_update_dut_submit_time(custom_logger):
453    custom_time = "2023-11-09T02:37:06Z"
454    custom_logger.create_dut_job(status="pass")
455    custom_logger.update_dut_time("submit", custom_time)
456
457    logger_data = custom_logger.logger.data
458    assert "dut_jobs" in logger_data
459    assert len(logger_data["dut_jobs"]) == 1
460
461    dut_job = logger_data["dut_jobs"][0]
462    assert "dut_submit_time" in dut_job
463
464    try:
465        datetime.fromisoformat(dut_job["dut_submit_time"])
466        assert True
467    except ValueError:
468        assert False
469
470
471# Test case for updating DUT end time
472def test_update_dut_end_time(custom_logger):
473    custom_logger.create_dut_job(status="pass")
474    custom_logger.update_dut_time("end", None)
475
476    logger_data = custom_logger.logger.data
477    assert "dut_jobs" in logger_data
478    assert len(logger_data["dut_jobs"]) == 1
479
480    dut_job = logger_data["dut_jobs"][0]
481    assert "dut_end_time" in dut_job
482
483    try:
484        datetime.fromisoformat(dut_job["dut_end_time"])
485        assert True
486    except ValueError:
487        assert False
488
489
490# Test case for updating DUT time with invalid value
491def test_update_dut_time_invalid_value(custom_logger):
492    custom_logger.create_dut_job(status="pass")
493    with pytest.raises(
494        ValueError,
495        match="Error: Invalid argument provided for --update-dut-time. Use 'start', 'submit', 'end'.",
496    ):
497        custom_logger.update_dut_time("invalid_value", None)
498
499
500# Test case for close_dut_job
501def test_close_dut_job(custom_logger):
502    custom_logger.create_dut_job(status="pass")
503
504    custom_logger.create_job_phase("Phase1")
505    custom_logger.create_job_phase("Phase2")
506
507    custom_logger.close_dut_job()
508
509    logger_data = custom_logger.logger.data
510    assert "dut_jobs" in logger_data
511    assert len(logger_data["dut_jobs"]) == 1
512
513    dut_job = logger_data["dut_jobs"][0]
514    assert "dut_job_phases" in dut_job
515    dut_job_phases = dut_job["dut_job_phases"]
516
517    phase1 = dut_job_phases[0]
518    assert phase1["name"] == "Phase1"
519
520    try:
521        datetime.fromisoformat(phase1["start_time"])
522        assert True
523    except ValueError:
524        assert False
525
526    try:
527        datetime.fromisoformat(phase1["end_time"])
528        assert True
529    except ValueError:
530        assert False
531
532    phase2 = dut_job_phases[1]
533    assert phase2["name"] == "Phase2"
534
535    try:
536        datetime.fromisoformat(phase2["start_time"])
537        assert True
538    except ValueError:
539        assert False
540
541    try:
542        datetime.fromisoformat(phase2["end_time"])
543        assert True
544    except ValueError:
545        assert False
546
547
548# Test case for close
549def test_close(custom_logger):
550    custom_logger.create_dut_job(status="pass")
551
552    custom_logger.close()
553
554    logger_data = custom_logger.logger.data
555    assert "dut_jobs" in logger_data
556    assert len(logger_data["dut_jobs"]) == 1
557    assert "dut_attempt_counter" in logger_data
558    assert logger_data["dut_attempt_counter"] == len(logger_data["dut_jobs"])
559    assert "job_combined_status" in logger_data
560    assert logger_data["job_combined_status"] != ""
561
562    dut_job = logger_data["dut_jobs"][0]
563    assert "submitter_end_time" in dut_job
564    try:
565        datetime.fromisoformat(dut_job["submitter_end_time"])
566        assert True
567    except ValueError:
568        assert False
569
570
571# Test case for updating status to fail with a reason
572def test_update_status_fail_with_reason(custom_logger):
573    custom_logger.create_dut_job()
574
575    reason = "kernel panic"
576    custom_logger.update_status_fail(reason)
577
578    logger_data = custom_logger.logger.data
579    assert "dut_jobs" in logger_data
580    assert len(logger_data["dut_jobs"]) == 1
581
582    dut_job = logger_data["dut_jobs"][0]
583    assert "status" in dut_job
584    assert dut_job["status"] == "fail"
585    assert "dut_job_fail_reason" in dut_job
586    assert dut_job["dut_job_fail_reason"] == reason
587
588
589# Test case for updating status to fail without providing a reason
590def test_update_status_fail_without_reason(custom_logger):
591    custom_logger.create_dut_job()
592
593    custom_logger.update_status_fail()
594
595    # Check if the status is updated and fail reason is empty
596    logger_data = custom_logger.logger.data
597    assert "dut_jobs" in logger_data
598    assert len(logger_data["dut_jobs"]) == 1
599
600    dut_job = logger_data["dut_jobs"][0]
601    assert "status" in dut_job
602    assert dut_job["status"] == "fail"
603    assert "dut_job_fail_reason" in dut_job
604    assert dut_job["dut_job_fail_reason"] == ""
605
606
607# Test case for check_dut_timings with submission time earlier than start time
608def test_check_dut_timings_submission_earlier_than_start(custom_logger, caplog):
609    custom_logger.create_dut_job()
610
611    # Set submission time to be earlier than start time
612    custom_logger.update_dut_time("start", "2023-01-01T11:00:00")
613    custom_logger.update_dut_time("submit", "2023-01-01T12:00:00")
614
615    logger_data = custom_logger.logger.data
616    assert "dut_jobs" in logger_data
617    assert len(logger_data["dut_jobs"]) == 1
618
619    job = logger_data["dut_jobs"][0]
620
621    # Call check_dut_timings
622    custom_logger.check_dut_timings(job)
623
624    # Check if an error message is logged
625    assert "Job submission is happening before job start." in caplog.text
626
627
628# Test case for check_dut_timings with end time earlier than start time
629def test_check_dut_timings_end_earlier_than_start(custom_logger, caplog):
630    custom_logger.create_dut_job()
631
632    # Set end time to be earlier than start time
633    custom_logger.update_dut_time("end", "2023-01-01T11:00:00")
634    custom_logger.update_dut_time("start", "2023-01-01T12:00:00")
635
636    logger_data = custom_logger.logger.data
637    assert "dut_jobs" in logger_data
638    assert len(logger_data["dut_jobs"]) == 1
639
640    job = logger_data["dut_jobs"][0]
641
642    # Call check_dut_timings
643    custom_logger.check_dut_timings(job)
644
645    # Check if an error message is logged
646    assert "Job ended before it started." in caplog.text
647
648
649# Test case for check_dut_timings with valid timing sequence
650def test_check_dut_timings_valid_timing_sequence(custom_logger, caplog):
651    custom_logger.create_dut_job()
652
653    # Set valid timing sequence
654    custom_logger.update_dut_time("submit", "2023-01-01T12:00:00")
655    custom_logger.update_dut_time("start", "2023-01-01T12:30:00")
656    custom_logger.update_dut_time("end", "2023-01-01T13:00:00")
657
658    logger_data = custom_logger.logger.data
659    assert "dut_jobs" in logger_data
660    assert len(logger_data["dut_jobs"]) == 1
661
662    job = logger_data["dut_jobs"][0]
663
664    # Call check_dut_timings
665    custom_logger.check_dut_timings(job)
666
667    # Check that no error messages are logged
668    assert "Job submission is happening before job start." not in caplog.text
669    assert "Job ended before it started." not in caplog.text
670