1# Copyright 2014 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import json 16import glob 17import logging 18import os 19import os.path 20import re 21import subprocess 22import sys 23import tempfile 24import time 25 26import camera_properties_utils 27import capture_request_utils 28import image_processing_utils 29import its_session_utils 30import numpy as np 31import yaml 32 33YAML_FILE_DIR = os.environ['CAMERA_ITS_TOP'] 34CONFIG_FILE = os.path.join(YAML_FILE_DIR, 'config.yml') 35TEST_KEY_TABLET = 'tablet' 36TEST_KEY_SENSOR_FUSION = 'sensor_fusion' 37LOAD_SCENE_DELAY = 1 # seconds 38ACTIVITY_START_WAIT = 1.5 # seconds 39MERGE_RESULTS_TIMEOUT = 3600 # seconds 40 41NUM_TRIES = 2 42RESULT_PASS = 'PASS' 43RESULT_FAIL = 'FAIL' 44RESULT_NOT_EXECUTED = 'NOT_EXECUTED' 45RESULT_KEY = 'result' 46METRICS_KEY = 'mpc_metrics' 47SUMMARY_KEY = 'summary' 48RESULT_VALUES = {RESULT_PASS, RESULT_FAIL, RESULT_NOT_EXECUTED} 49CTS_VERIFIER_PACKAGE_NAME = 'com.android.cts.verifier' 50ITS_TEST_ACTIVITY = 'com.android.cts.verifier/.camera.its.ItsTestActivity' 51ACTION_ITS_RESULT = 'com.android.cts.verifier.camera.its.ACTION_ITS_RESULT' 52EXTRA_VERSION = 'camera.its.extra.VERSION' 53CURRENT_ITS_VERSION = '1.0' # version number to sync with CtsVerifier 54EXTRA_CAMERA_ID = 'camera.its.extra.CAMERA_ID' 55EXTRA_RESULTS = 'camera.its.extra.RESULTS' 56TIME_KEY_START = 'start' 57TIME_KEY_END = 'end' 58VALID_CONTROLLERS = ('arduino', 'canakit') 59_INT_STR_DICT = {'11': '1_1', '12': '1_2'} # recover replaced '_' in scene def 60_FRONT_CAMERA_ID = '1' 61_PROPERTIES_TO_MATCH = ( 62 'ro.product.model', 'ro.product.name', 'ro.build.display.id', 'ro.revision' 63) 64_MAIN_TESTBED = 0 65 66# All possible scenes 67# Notes on scene names: 68# scene*_1/2/... are same scene split to load balance run times for scenes 69# scene*_a/b/... are similar scenes that share one or more tests 70_ALL_SCENES = [ 71 'scene0', 'scene1_1', 'scene1_2', 'scene2_a', 'scene2_b', 'scene2_c', 72 'scene2_d', 'scene2_e', 'scene2_f', 'scene3', 'scene4', 'scene5', 73 'scene6', os.path.join('scene_extensions', 'scene_hdr'), 74 os.path.join('scene_extensions', 'scene_night'), 'sensor_fusion' 75] 76 77# Scenes that can be automated through tablet display 78_AUTO_SCENES = [ 79 'scene0', 'scene1_1', 'scene1_2', 'scene2_a', 'scene2_b', 'scene2_c', 80 'scene2_d', 'scene2_e', 'scene2_f', 'scene3', 'scene4', 'scene6', 81 os.path.join('scene_extensions', 'scene_hdr'), 82 os.path.join('scene_extensions', 'scene_night') 83] 84 85# Scenes that are logically grouped and can be called as group 86_GROUPED_SCENES = { 87 'scene1': ['scene1_1', 'scene1_2'], 88 'scene2': ['scene2_a', 'scene2_b', 'scene2_c', 'scene2_d', 'scene2_e', 89 'scene2_f'] 90} 91 92# Scenes that have to be run manually regardless of configuration 93_MANUAL_SCENES = ['scene5'] 94 95# Scene requirements for manual testing. 96_SCENE_REQ = { 97 'scene0': None, 98 'scene1_1': 'A grey card covering at least the middle 30% of the scene', 99 'scene1_2': 'A grey card covering at least the middle 30% of the scene', 100 'scene2_a': 'The picture with 3 faces in tests/scene2_a/scene2_a.png', 101 'scene2_b': 'The picture with 3 faces in tests/scene2_b/scene2_b.png', 102 'scene2_c': 'The picture with 3 faces in tests/scene2_c/scene2_c.png', 103 'scene2_d': 'The picture with 3 faces in tests/scene2_d/scene2_d.png', 104 'scene2_e': 'The picture with 3 faces in tests/scene2_e/scene2_e.png', 105 'scene2_f': 'The picture with 3 faces in tests/scene2_f/scene2_f.png', 106 'scene3': 'The ISO12233 chart', 107 'scene4': 'A test chart of a circle covering at least the middle 50% of ' 108 'the scene. See tests/scene4/scene4.png', 109 'scene5': 'Capture images with a diffuser attached to the camera. ' 110 'See source.android.com/docs/compatibility/cts/camera-its-tests#scene5/diffuser ' 111 'for more details', 112 'scene6': 'A grid of black circles on a white background. ' 113 'See tests/scene6/scene6.png', 114 # Use os.path to avoid confusion on other platforms 115 os.path.join('scene_extensions', 'scene_hdr'): ( 116 'A tablet displayed scene with a face on the left ' 117 'and a low-contrast QR code on the right. ' 118 'See tests/scene_extensions/scene_hdr/scene_hdr.png' 119 ), 120 os.path.join('scene_extensions', 'scene_night'): ( 121 'A tablet displayed scene with a white circle ' 122 'and four smaller circles inside of it. ' 123 'See tests/scene_extensions/scene_night/scene_night.png' 124 ), 125 'sensor_fusion': 'A checkerboard pattern for phone to rotate in front of ' 126 'in tests/sensor_fusion/checkerboard.pdf\n' 127 'See tests/sensor_fusion/SensorFusion.pdf for detailed ' 128 'instructions.\nNote that this test will be skipped ' 129 'on devices not supporting REALTIME camera timestamp.', 130} 131 132 133SUB_CAMERA_TESTS = { 134 'scene0': [ 135 'test_burst_capture', 136 'test_jitter', 137 'test_metadata', 138 'test_read_write', 139 'test_sensor_events', 140 'test_solid_color_test_pattern', 141 'test_unified_timestamps', 142 ], 143 'scene1_1': [ 144 'test_burst_sameness_manual', 145 'test_dng_noise_model', 146 'test_exposure', 147 'test_linearity', 148 ], 149 'scene1_2': [ 150 'test_raw_exposure', 151 'test_raw_sensitivity', 152 'test_yuv_plus_raw', 153 ], 154 'scene2_a': [ 155 'test_num_faces', 156 ], 157 'scene4': [ 158 'test_aspect_ratio_and_crop', 159 ], 160 'sensor_fusion': [ 161 'test_sensor_fusion', 162 ], 163} 164 165_LIGHTING_CONTROL_TESTS = [ 166 'test_auto_flash.py', 167 'test_preview_min_frame_rate.py', 168 'test_led_snapshot.py', 169 'test_night_extension.py', 170 'test_hdr_extension.py', 171 ] 172 173_DST_SCENE_DIR = '/sdcard/Download/' 174MOBLY_TEST_SUMMARY_TXT_FILE = 'test_mobly_summary.txt' 175 176 177def run(cmd): 178 """Replaces os.system call, while hiding stdout+stderr messages.""" 179 with open(os.devnull, 'wb') as devnull: 180 subprocess.check_call(cmd.split(), stdout=devnull, stderr=subprocess.STDOUT) 181 182 183def check_cts_apk_installed(device_id): 184 """Verifies that CtsVerifer.apk is installed on a given device.""" 185 verify_cts_cmd = f'adb -s {device_id} shell pm list packages | grep {CTS_VERIFIER_PACKAGE_NAME}' 186 raw_output = subprocess.check_output( 187 verify_cts_cmd, stderr=subprocess.STDOUT, shell=True 188 ) 189 output = str(raw_output.decode('utf-8')).strip() 190 if CTS_VERIFIER_PACKAGE_NAME not in output: 191 raise AssertionError( 192 f"{CTS_VERIFIER_PACKAGE_NAME} was not found in {device_id}'s list of packages!" 193 ) 194 195 196def report_result(device_id, camera_id, results): 197 """Sends a pass/fail result to the device, via an intent. 198 199 Args: 200 device_id: The ID string of the device to report the results to. 201 camera_id: The ID string of the camera for which to report pass/fail. 202 results: a dictionary contains all ITS scenes as key and result/summary of 203 current ITS run. See test_report_result unit test for an example. 204 """ 205 adb = f'adb -s {device_id}' 206 initialization_cmds = ( 207 f'{adb} shell input keyevent KEYCODE_WAKEUP', 208 f'{adb} shell input keyevent KEYCODE_MENU', 209 (f'{adb} shell am start -n {ITS_TEST_ACTIVITY} ' 210 '--activity-brought-to-front') 211 ) 212 # Awaken if necessary and start ItsTestActivity to receive test results 213 for cmd in initialization_cmds: 214 run(cmd) 215 time.sleep(ACTIVITY_START_WAIT) 216 217 # Validate/process results argument 218 for scene in results: 219 if RESULT_KEY not in results[scene]: 220 raise ValueError(f'ITS result not found for {scene}') 221 if results[scene][RESULT_KEY] not in RESULT_VALUES: 222 raise ValueError(f'Unknown ITS result for {scene}: {results[RESULT_KEY]}') 223 if SUMMARY_KEY in results[scene]: 224 device_summary_path = f'/sdcard/its_camera{camera_id}_{scene}.txt' 225 run('%s push %s %s' % 226 (adb, results[scene][SUMMARY_KEY], device_summary_path)) 227 results[scene][SUMMARY_KEY] = device_summary_path 228 229 json_results = json.dumps(results) 230 cmd = (f"{adb} shell am broadcast -a {ACTION_ITS_RESULT} --es {EXTRA_VERSION}" 231 f" {CURRENT_ITS_VERSION} --es {EXTRA_CAMERA_ID} {camera_id} --es " 232 f"{EXTRA_RESULTS} \'{json_results}\'") 233 if len(cmd) > 8000: 234 logging.info('ITS command string might be too long! len:%s', len(cmd)) 235 run(cmd) 236 237 238def write_result(testbed_index, device_id, camera_id, results): 239 """Writes results to a temporary file for merging. 240 241 Args: 242 testbed_index: the index of a finished testbed. 243 device_id: the ID string of the device that created results. 244 camera_id: the ID string of the camera of the device. 245 results: a dictionary that contains all ITS scenes as key 246 and result/summary of current ITS run. 247 """ 248 result = {'device_id': device_id, 'results': results} 249 file_name = f'testbed_{testbed_index}_camera_{camera_id}.tmp' 250 with open(file_name, 'w') as f: 251 json.dump(result, f) 252 253 254def parse_testbeds(completed_testbeds): 255 """Parses completed testbeds and yields device_id, camera_id, and results. 256 257 Args: 258 completed_testbeds: an iterable of completed testbed indices. 259 Yields: 260 device_id: the device associated with the testbed. 261 camera_id: one of the camera_ids associated with the testbed. 262 results: the dictionary with scenes and result/summary of testbed's run. 263 """ 264 for i in completed_testbeds: 265 for file_name in glob.glob(f'testbed_{i}_camera_*.tmp'): 266 camera_id = file_name.split('camera_')[1].split('.tmp')[0] 267 device_id = '' 268 results = {} 269 with open(file_name, 'r') as f: 270 testbed_data = json.load(f) 271 device_id = testbed_data['device_id'] 272 results = testbed_data['results'] 273 if not device_id or not results: 274 raise ValueError(f'device_id or results for {file_name} not found.') 275 yield device_id, camera_id, results 276 277 278def get_device_property(device_id, property_name): 279 """Get property of a given device. 280 281 Args: 282 device_id: the ID string of a device. 283 property_name: the desired property string. 284 Returns: 285 The value of the property. 286 """ 287 property_cmd = f'adb -s {device_id} shell getprop {property_name}' 288 raw_output = subprocess.check_output( 289 property_cmd, stderr=subprocess.STDOUT, shell=True) 290 return str(raw_output.decode('utf-8')).strip() 291 292 293def are_devices_similar(device_id_1, device_id_2): 294 """Checks if key dimensions are the same between devices. 295 296 Args: 297 device_id_1: the ID string of the _MAIN_TESTBED device. 298 device_id_2: the ID string of another device. 299 Returns: 300 True if both devices share key dimensions. 301 """ 302 for property_to_match in _PROPERTIES_TO_MATCH: 303 property_value_1 = get_device_property(device_id_1, property_to_match) 304 property_value_2 = get_device_property(device_id_2, property_to_match) 305 if property_value_1 != property_value_2: 306 logging.error('%s does not match %s for %s', 307 property_value_1, property_value_2, property_to_match) 308 return False 309 return True 310 311 312def load_scenes_on_tablet(scene, tablet_id): 313 """Copies scenes onto the tablet before running the tests. 314 315 Args: 316 scene: Name of the scene to copy image files. 317 tablet_id: adb id of tablet 318 """ 319 logging.info('Copying files to tablet: %s', tablet_id) 320 scene_dir = os.listdir( 321 os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', scene)) 322 for file_name in scene_dir: 323 if file_name.endswith('.png'): 324 src_scene_file = os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', 325 scene, file_name) 326 cmd = f'adb -s {tablet_id} push {src_scene_file} {_DST_SCENE_DIR}' 327 subprocess.Popen(cmd.split()) 328 time.sleep(LOAD_SCENE_DELAY) 329 logging.info('Finished copying files to tablet.') 330 331 332def check_manual_scenes(device_id, camera_id, scene, out_path): 333 """Halt run to change scenes. 334 335 Args: 336 device_id: id of device 337 camera_id: id of camera 338 scene: Name of the scene to copy image files. 339 out_path: output file location 340 """ 341 with its_session_utils.ItsSession( 342 device_id=device_id, 343 camera_id=camera_id) as cam: 344 props = cam.get_camera_properties() 345 props = cam.override_with_hidden_physical_camera_props(props) 346 347 while True: 348 input(f'\n Press <ENTER> after positioning camera {camera_id} with ' 349 f'{scene}.\n The scene setup should be: \n {_SCENE_REQ[scene]}\n') 350 # Converge 3A prior to capture 351 if scene == 'scene5': 352 cam.do_3a(do_af=False, lock_ae=camera_properties_utils.ae_lock(props), 353 lock_awb=camera_properties_utils.awb_lock(props)) 354 else: 355 cam.do_3a() 356 req, fmt = capture_request_utils.get_fastest_auto_capture_settings(props) 357 logging.info('Capturing an image to check the test scene') 358 cap = cam.do_capture(req, fmt) 359 img = image_processing_utils.convert_capture_to_rgb_image(cap) 360 img_name = os.path.join(out_path, f'test_{scene}.jpg') 361 logging.info('Please check scene setup in %s', img_name) 362 image_processing_utils.write_image(img, img_name) 363 choice = input(f'Is the image okay for ITS {scene}? (Y/N)').lower() 364 if choice == 'y': 365 break 366 367 368def get_config_file_contents(): 369 """Read the config file contents from a YML file. 370 371 Args: 372 None 373 374 Returns: 375 config_file_contents: a dict read from config.yml 376 """ 377 with open(CONFIG_FILE) as file: 378 config_file_contents = yaml.safe_load(file) 379 return config_file_contents 380 381 382def get_test_params(config_file_contents): 383 """Reads the config file parameters. 384 385 Args: 386 config_file_contents: dict read from config.yml file 387 388 Returns: 389 dict of test parameters 390 """ 391 test_params = None 392 for _, j in config_file_contents.items(): 393 for datadict in j: 394 test_params = datadict.get('TestParams') 395 return test_params 396 397 398def get_device_serial_number(device, config_file_contents): 399 """Returns the serial number of the device with label from the config file. 400 401 The config file contains TestBeds dictionary which contains Controllers and 402 Android Device dicts.The two devices used by the test per box are listed 403 here labels dut and tablet. Parse through the nested TestBeds dict to get 404 the Android device details. 405 406 Args: 407 device: String device label as specified in config file.dut/tablet 408 config_file_contents: dict read from config.yml file 409 """ 410 411 for _, j in config_file_contents.items(): 412 for datadict in j: 413 android_device_contents = datadict.get('Controllers') 414 for device_dict in android_device_contents.get('AndroidDevice'): 415 for _, label in device_dict.items(): 416 if label == 'tablet': 417 tablet_device_id = str(device_dict.get('serial')) 418 if label == 'dut': 419 dut_device_id = str(device_dict.get('serial')) 420 if device == 'tablet': 421 return tablet_device_id 422 else: 423 return dut_device_id 424 425 426def get_updated_yml_file(yml_file_contents): 427 """Create a new yml file and write the testbed contents in it. 428 429 This testbed file is per box and contains all the parameters and 430 device id used by the mobly tests. 431 432 Args: 433 yml_file_contents: Data to write in yml file. 434 435 Returns: 436 Updated yml file contents. 437 """ 438 os.chmod(YAML_FILE_DIR, 0o755) 439 file_descriptor, new_yaml_file = tempfile.mkstemp( 440 suffix='.yml', prefix='config_', dir=YAML_FILE_DIR) 441 os.close(file_descriptor) 442 with open(new_yaml_file, 'w') as f: 443 yaml.dump(yml_file_contents, stream=f, default_flow_style=False) 444 new_yaml_file_name = os.path.basename(new_yaml_file) 445 return new_yaml_file_name 446 447 448def enable_external_storage(device_id): 449 """Override apk mode to allow write to external storage. 450 451 Args: 452 device_id: Serial number of the device. 453 454 """ 455 cmd = (f'adb -s {device_id} shell appops ' 456 'set com.android.cts.verifier MANAGE_EXTERNAL_STORAGE allow') 457 run(cmd) 458 459 460def get_available_cameras(device_id, camera_id): 461 """Get available camera devices in the current state. 462 463 Args: 464 device_id: Serial number of the device. 465 camera_id: Logical camera_id 466 467 Returns: 468 List of all the available camera_ids. 469 """ 470 with its_session_utils.ItsSession( 471 device_id=device_id, 472 camera_id=camera_id) as cam: 473 props = cam.get_camera_properties() 474 props = cam.override_with_hidden_physical_camera_props(props) 475 unavailable_physical_cameras = cam.get_unavailable_physical_cameras( 476 camera_id) 477 unavailable_physical_ids = unavailable_physical_cameras[ 478 'unavailablePhysicalCamerasArray'] 479 output = cam.get_camera_ids() 480 all_camera_ids = output['cameraIdArray'] 481 # Concat camera_id, physical camera_id and sub camera separator 482 unavailable_physical_ids = [f'{camera_id}.{s}' 483 for s in unavailable_physical_ids] 484 for i in unavailable_physical_ids: 485 if i in all_camera_ids: 486 all_camera_ids.remove(i) 487 logging.debug('available camera ids: %s', all_camera_ids) 488 return all_camera_ids 489 490 491def get_unavailable_physical_cameras(device_id, camera_id): 492 """Get unavailable physical cameras in the current state. 493 494 Args: 495 device_id: Serial number of the device. 496 camera_id: Logical camera device id 497 498 Returns: 499 List of all the unavailable camera_ids. 500 """ 501 with its_session_utils.ItsSession( 502 device_id=device_id, 503 camera_id=camera_id) as cam: 504 unavailable_physical_cameras = cam.get_unavailable_physical_cameras( 505 camera_id) 506 unavailable_physical_ids = unavailable_physical_cameras[ 507 'unavailablePhysicalCamerasArray'] 508 unavailable_physical_ids = [f'{camera_id}.{s}' 509 for s in unavailable_physical_ids] 510 logging.debug('Unavailable physical camera ids: %s', 511 unavailable_physical_ids) 512 return unavailable_physical_ids 513 514 515def is_device_folded(device_id): 516 """Returns True if the foldable device is in folded state. 517 518 Args: 519 device_id: Serial number of the foldable device. 520 """ 521 cmd = (f'adb -s {device_id} shell cmd device_state state') 522 result = subprocess.getoutput(cmd) 523 if 'CLOSE' in result: 524 return True 525 return False 526 527 528def main(): 529 """Run all the Camera ITS automated tests. 530 531 Script should be run from the top-level CameraITS directory. 532 533 Command line arguments: 534 camera: the camera(s) to be tested. Use comma to separate multiple 535 camera Ids. Ex: "camera=0,1" or "camera=1" 536 scenes: the test scene(s) to be executed. Use comma to separate 537 multiple scenes. Ex: "scenes=scene0,scene1_1" or 538 "scenes=0,1_1,sensor_fusion" (sceneX can be abbreviated by X 539 where X is scene name minus 'scene') 540 """ 541 logging.basicConfig(level=logging.INFO) 542 # Make output directories to hold the generated files. 543 topdir = tempfile.mkdtemp(prefix='CameraITS_') 544 try: 545 subprocess.call(['chmod', 'g+rx', topdir]) 546 except OSError as e: 547 logging.info(repr(e)) 548 549 scenes = [] 550 camera_id_combos = [] 551 testbed_index = None 552 num_testbeds = None 553 # Override camera, scenes and testbed with cmd line values if available 554 for s in list(sys.argv[1:]): 555 if 'scenes=' in s: 556 scenes = s.split('=')[1].split(',') 557 elif 'camera=' in s: 558 camera_id_combos = s.split('=')[1].split(',') 559 elif 'testbed_index=' in s: 560 testbed_index = int(s.split('=')[1]) 561 elif 'num_testbeds=' in s: 562 num_testbeds = int(s.split('=')[1]) 563 else: 564 raise ValueError(f'Unknown argument {s}') 565 if testbed_index is None and num_testbeds is not None: 566 raise ValueError( 567 'testbed_index must be specified if num_testbeds is specified.') 568 if (testbed_index is not None and num_testbeds is not None and 569 testbed_index >= num_testbeds): 570 raise ValueError('testbed_index must be less than num_testbeds. ' 571 'testbed_index starts at 0.') 572 573 # Read config file and extract relevant TestBed 574 config_file_contents = get_config_file_contents() 575 if testbed_index is None: 576 for i in config_file_contents['TestBeds']: 577 if scenes == ['sensor_fusion']: 578 if TEST_KEY_SENSOR_FUSION not in i['Name'].lower(): 579 config_file_contents['TestBeds'].remove(i) 580 else: 581 if TEST_KEY_SENSOR_FUSION in i['Name'].lower(): 582 config_file_contents['TestBeds'].remove(i) 583 else: 584 config_file_contents = { 585 'TestBeds': [config_file_contents['TestBeds'][testbed_index]] 586 } 587 588 # Get test parameters from config file 589 test_params_content = get_test_params(config_file_contents) 590 if not camera_id_combos: 591 camera_id_combos = str(test_params_content['camera']).split(',') 592 if not scenes: 593 scenes = str(test_params_content['scene']).split(',') 594 scenes = [_INT_STR_DICT.get(n, n) for n in scenes] # recover '1_1' & '1_2' 595 596 device_id = get_device_serial_number('dut', config_file_contents) 597 # Enable external storage on DUT to send summary report to CtsVerifier.apk 598 enable_external_storage(device_id) 599 600 # Verify that CTS Verifier is installed 601 check_cts_apk_installed(device_id) 602 # Check whether the dut is foldable or not 603 testing_foldable_device = True if test_params_content[ 604 'foldable_device'] == 'True' else False 605 available_camera_ids_to_test_foldable = [] 606 if testing_foldable_device: 607 logging.debug('Testing foldable device.') 608 # Check the state of foldable device. True if device is folded, 609 # false if the device is opened. 610 device_folded = is_device_folded(device_id) 611 # list of available camera_ids to be tested in device state 612 available_camera_ids_to_test_foldable = get_available_cameras( 613 device_id, _FRONT_CAMERA_ID) 614 615 config_file_test_key = config_file_contents['TestBeds'][0]['Name'].lower() 616 logging.info('Saving %s output files to: %s', config_file_test_key, topdir) 617 if TEST_KEY_TABLET in config_file_test_key: 618 tablet_id = get_device_serial_number('tablet', config_file_contents) 619 tablet_name_cmd = f'adb -s {tablet_id} shell getprop ro.build.product' 620 raw_output = subprocess.check_output( 621 tablet_name_cmd, stderr=subprocess.STDOUT, shell=True) 622 tablet_name = str(raw_output.decode('utf-8')).strip() 623 logging.debug('Tablet name: %s', tablet_name) 624 brightness = test_params_content['brightness'] 625 its_session_utils.validate_tablet_brightness(tablet_name, brightness) 626 else: 627 tablet_id = None 628 629 testing_sensor_fusion_with_controller = False 630 if TEST_KEY_SENSOR_FUSION in config_file_test_key: 631 if test_params_content['rotator_cntl'].lower() in VALID_CONTROLLERS: 632 testing_sensor_fusion_with_controller = True 633 634 testing_flash_with_controller = False 635 if (TEST_KEY_TABLET in config_file_test_key or 636 'manual' in config_file_test_key): 637 if test_params_content.get('lighting_cntl', 'None').lower() == 'arduino': 638 testing_flash_with_controller = True 639 640 # Prepend 'scene' if not specified at cmd line 641 for i, s in enumerate(scenes): 642 if (not s.startswith('scene') and 643 not s.startswith(('sensor_fusion', '<scene-name>'))): 644 scenes[i] = f'scene{s}' 645 646 # Expand GROUPED_SCENES and remove any duplicates 647 scenes = [_GROUPED_SCENES[s] if s in _GROUPED_SCENES else s for s in scenes] 648 scenes = np.hstack(scenes).tolist() 649 scenes = sorted(set(scenes), key=scenes.index) 650 # List of scenes to be executed in folded state will have '_folded' 651 # prefix. This will help distinguish the test results from folded vs 652 # open device state for front camera_ids. 653 folded_device_scenes = [] 654 for scene in scenes: 655 folded_device_scenes.append(f'{scene}_folded') 656 657 logging.info('Running ITS on device: %s, camera(s): %s, scene(s): %s', 658 device_id, camera_id_combos, scenes) 659 660 # Determine if manual run 661 if tablet_id is not None and not set(scenes).intersection(_MANUAL_SCENES): 662 auto_scene_switch = True 663 else: 664 auto_scene_switch = False 665 logging.info('No tablet: manual, sensor_fusion, or scene5 testing.') 666 667 folded_prompted = False 668 opened_prompted = False 669 for camera_id in camera_id_combos: 670 test_params_content['camera'] = camera_id 671 results = {} 672 unav_cameras = [] 673 # Get the list of unavailable cameras in current device state. 674 # These camera_ids should not be tested in current device state. 675 if testing_foldable_device: 676 unav_cameras = get_unavailable_physical_cameras( 677 device_id, _FRONT_CAMERA_ID) 678 679 if testing_foldable_device: 680 device_state = 'folded' if device_folded else 'opened' 681 682 testing_folded_front_camera = (testing_foldable_device and 683 device_folded and 684 _FRONT_CAMERA_ID in camera_id) 685 686 # Raise an assertion error if there is any camera unavailable in 687 # current device state. Usually scenes with suffix 'folded' will 688 # be executed in folded state. 689 if (testing_foldable_device and 690 _FRONT_CAMERA_ID in camera_id and camera_id in unav_cameras): 691 raise AssertionError( 692 f'Camera {camera_id} is unavailable in device state {device_state}' 693 f' and cannot be tested with device {device_state}!') 694 695 if (testing_folded_front_camera and camera_id not in unav_cameras 696 and not folded_prompted): 697 folded_prompted = True 698 input('\nYou are testing a foldable device in folded state. ' 699 'Please make sure the device is folded and press <ENTER> ' 700 'after positioning properly.\n') 701 702 if (testing_foldable_device and 703 not device_folded and _FRONT_CAMERA_ID in camera_id and 704 camera_id not in unav_cameras and not opened_prompted): 705 opened_prompted = True 706 input('\nYou are testing a foldable device in opened state. ' 707 'Please make sure the device is unfolded and press <ENTER> ' 708 'after positioning properly.\n') 709 710 # Run through all scenes if user does not supply one and config file doesn't 711 # have specific scene name listed. 712 if its_session_utils.SUB_CAMERA_SEPARATOR in camera_id: 713 possible_scenes = list(SUB_CAMERA_TESTS.keys()) 714 if auto_scene_switch: 715 possible_scenes.remove('sensor_fusion') 716 else: 717 possible_scenes = _AUTO_SCENES if auto_scene_switch else _ALL_SCENES 718 719 if '<scene-name>' in scenes: 720 per_camera_scenes = possible_scenes 721 else: 722 # Validate user input scene names 723 per_camera_scenes = [] 724 for s in scenes: 725 if s in possible_scenes: 726 per_camera_scenes.append(s) 727 if not per_camera_scenes: 728 raise ValueError('No valid scene specified for this camera.') 729 730 # Folded state scenes will have 'folded' suffix only for 731 # front cameras since rear cameras are common in both folded 732 # and unfolded state. 733 foldable_per_camera_scenes = [] 734 if testing_folded_front_camera: 735 if camera_id not in available_camera_ids_to_test_foldable: 736 raise AssertionError(f'camera {camera_id} is not available.') 737 for s in per_camera_scenes: 738 foldable_per_camera_scenes.append(f'{s}_folded') 739 740 if foldable_per_camera_scenes: 741 per_camera_scenes = foldable_per_camera_scenes 742 743 logging.info('camera: %s, scene(s): %s', camera_id, per_camera_scenes) 744 745 if testing_folded_front_camera: 746 all_scenes = [f'{s}_folded' for s in _ALL_SCENES] 747 else: 748 all_scenes = _ALL_SCENES 749 750 for s in all_scenes: 751 results[s] = {RESULT_KEY: RESULT_NOT_EXECUTED} 752 753 # assert device folded testing scenes with suffix 'folded' 754 if testing_foldable_device and 'folded' in s: 755 if not device_folded: 756 raise AssertionError('Device should be folded during' 757 ' testing scenes with suffix "folded"') 758 759 # A subdir in topdir will be created for each camera_id. All scene test 760 # output logs for each camera id will be stored in this subdir. 761 # This output log path is a mobly param : LogPath 762 cam_id_string = f"cam_id_{camera_id.replace(its_session_utils.SUB_CAMERA_SEPARATOR, '_')}" 763 mobly_output_logs_path = os.path.join(topdir, cam_id_string) 764 os.mkdir(mobly_output_logs_path) 765 tot_pass = 0 766 for s in per_camera_scenes: 767 results[s]['TEST_STATUS'] = [] 768 results[s][METRICS_KEY] = [] 769 770 # unit is millisecond for execution time record in CtsVerifier 771 scene_start_time = int(round(time.time() * 1000)) 772 scene_test_summary = f'Cam{camera_id} {s}' + '\n' 773 mobly_scene_output_logs_path = os.path.join(mobly_output_logs_path, s) 774 775 # Since test directories do not have 'folded' in the name, we need 776 # to remove that suffix for the path of the scenes to be loaded 777 # on the tablets 778 testing_scene = s 779 if 'folded' in s: 780 testing_scene = s.split('_folded')[0] 781 test_params_content['scene'] = testing_scene 782 test_params_content['scene_with_suffix'] = s 783 784 if auto_scene_switch: 785 # Copy scene images onto the tablet 786 if 'scene0' not in testing_scene: 787 load_scenes_on_tablet(testing_scene, tablet_id) 788 else: 789 # Check manual scenes for correctness 790 if 'scene0' not in testing_scene and not testing_sensor_fusion_with_controller: 791 check_manual_scenes(device_id, camera_id, testing_scene, 792 mobly_output_logs_path) 793 794 scene_test_list = [] 795 config_file_contents['TestBeds'][0]['TestParams'] = test_params_content 796 # Add the MoblyParams to config.yml file with the path to store camera_id 797 # test results. This is a separate dict other than TestBeds. 798 mobly_params_dict = { 799 'MoblyParams': { 800 'LogPath': mobly_scene_output_logs_path 801 } 802 } 803 config_file_contents.update(mobly_params_dict) 804 logging.debug('Final config file contents: %s', config_file_contents) 805 new_yml_file_name = get_updated_yml_file(config_file_contents) 806 logging.info('Using %s as temporary config yml file', new_yml_file_name) 807 if camera_id.rfind(its_session_utils.SUB_CAMERA_SEPARATOR) == -1: 808 scene_dir = os.listdir( 809 os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', testing_scene)) 810 for file_name in scene_dir: 811 if file_name.endswith('.py') and 'test' in file_name: 812 scene_test_list.append(file_name) 813 else: # sub-camera 814 if SUB_CAMERA_TESTS.get(testing_scene): 815 scene_test_list = [f'{test}.py' for test in SUB_CAMERA_TESTS[ 816 testing_scene]] 817 else: 818 scene_test_list = [] 819 scene_test_list.sort() 820 821 # Run tests for scene 822 logging.info('Running tests for %s with camera %s', 823 testing_scene, camera_id) 824 num_pass = 0 825 num_skip = 0 826 num_not_mandated_fail = 0 827 num_fail = 0 828 for test in scene_test_list: 829 # Handle repeated test 830 if 'tests/' in test: 831 cmd = [ 832 'python3', 833 os.path.join(os.environ['CAMERA_ITS_TOP'], test), '-c', 834 f'{new_yml_file_name}' 835 ] 836 else: 837 cmd = [ 838 'python3', 839 os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', 840 testing_scene, test), 841 '-c', 842 f'{new_yml_file_name}' 843 ] 844 for num_try in range(NUM_TRIES): 845 # Handle manual lighting control redirected stdout in test 846 if (test in _LIGHTING_CONTROL_TESTS and 847 not testing_flash_with_controller): 848 print('Turn lights OFF in rig and press <ENTER> to continue.') 849 850 # pylint: disable=subprocess-run-check 851 with open( 852 os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE), 'w') as fp: 853 output = subprocess.run(cmd, stdout=fp) 854 # pylint: enable=subprocess-run-check 855 856 # Parse mobly logs to determine SKIP, NOT_YET_MANDATED, and 857 # socket FAILs. 858 with open( 859 os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE), 'r') as file: 860 test_code = output.returncode 861 test_skipped = False 862 test_not_yet_mandated = False 863 test_mpc_req = '' 864 content = file.read() 865 866 # Find media performance class logging 867 lines = content.splitlines() 868 for one_line in lines: 869 # regular expression pattern must match 870 # MPC12_CAMERA_LAUNCH_PATTERN or MPC12_JPEG_CAPTURE_PATTERN in 871 # ItsTestActivity.java. 872 mpc_string_match = re.search( 873 '^(1080p_jpeg_capture_time_ms:|camera_launch_time_ms:)', 874 one_line) 875 if mpc_string_match: 876 test_mpc_req = one_line 877 break 878 879 if 'Test skipped' in content: 880 return_string = 'SKIP ' 881 num_skip += 1 882 test_skipped = True 883 break 884 885 if 'Not yet mandated test' in content: 886 return_string = 'FAIL*' 887 num_not_mandated_fail += 1 888 test_not_yet_mandated = True 889 break 890 891 if test_code == 0 and not test_skipped: 892 return_string = 'PASS ' 893 num_pass += 1 894 break 895 896 if test_code == 1 and not test_not_yet_mandated: 897 return_string = 'FAIL ' 898 if 'Problem with socket' in content and num_try != NUM_TRIES-1: 899 logging.info('Retry %s/%s', s, test) 900 else: 901 num_fail += 1 902 break 903 os.remove(os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE)) 904 status_prefix = '' 905 if testbed_index is not None: 906 status_prefix = config_file_test_key + ':' 907 logging.info('%s%s %s/%s', status_prefix, return_string, s, test) 908 test_name = test.split('/')[-1].split('.')[0] 909 results[s]['TEST_STATUS'].append({ 910 'test': test_name, 911 'status': return_string.strip()}) 912 if test_mpc_req: 913 results[s][METRICS_KEY].append(test_mpc_req) 914 msg_short = f'{return_string} {test}' 915 scene_test_summary += msg_short + '\n' 916 if test in _LIGHTING_CONTROL_TESTS and not testing_flash_with_controller: 917 print('Turn lights ON in rig and press <ENTER> to continue.') 918 919 # unit is millisecond for execution time record in CtsVerifier 920 scene_end_time = int(round(time.time() * 1000)) 921 skip_string = '' 922 tot_tests = len(scene_test_list) 923 tot_tests_run = tot_tests - num_skip 924 if tot_tests_run != 0: 925 tests_passed_ratio = (num_pass + num_not_mandated_fail) / tot_tests_run 926 else: 927 tests_passed_ratio = (num_pass + num_not_mandated_fail) / 100.0 928 tests_passed_ratio_format = f'{(100 * tests_passed_ratio):.1f}%' 929 if num_skip > 0: 930 skip_string = f",{num_skip} test{'s' if num_skip > 1 else ''} skipped" 931 test_result = (f'{num_pass + num_not_mandated_fail} / {tot_tests_run} ' 932 f'tests passed ({tests_passed_ratio_format}){skip_string}') 933 logging.info(test_result) 934 if num_not_mandated_fail > 0: 935 logging.info('(*) %s not_yet_mandated tests failed', 936 num_not_mandated_fail) 937 938 tot_pass += num_pass 939 logging.info('scene tests: %s, Total tests passed: %s', tot_tests, 940 tot_pass) 941 if tot_tests > 0: 942 logging.info('%s compatibility score: %.f/100\n', 943 s, 100 * num_pass / tot_tests) 944 scene_test_summary_path = os.path.join(mobly_scene_output_logs_path, 945 'scene_test_summary.txt') 946 with open(scene_test_summary_path, 'w') as f: 947 f.write(scene_test_summary) 948 results[s][RESULT_KEY] = (RESULT_PASS if num_fail == 0 else RESULT_FAIL) 949 results[s][SUMMARY_KEY] = scene_test_summary_path 950 results[s][TIME_KEY_START] = scene_start_time 951 results[s][TIME_KEY_END] = scene_end_time 952 else: 953 logging.info('%s compatibility score: 0/100\n') 954 955 # Delete temporary yml file after scene run. 956 new_yaml_file_path = os.path.join(YAML_FILE_DIR, new_yml_file_name) 957 os.remove(new_yaml_file_path) 958 959 # Log results per camera 960 if num_testbeds is None or testbed_index == _MAIN_TESTBED: 961 logging.info('Reporting camera %s ITS results to CtsVerifier', camera_id) 962 report_result(device_id, camera_id, results) 963 else: 964 write_result(testbed_index, device_id, camera_id, results) 965 966 logging.info('Test execution completed.') 967 968 # Power down tablet 969 if tablet_id: 970 cmd = f'adb -s {tablet_id} shell input keyevent KEYCODE_POWER' 971 subprocess.Popen(cmd.split()) 972 973 if num_testbeds is not None: 974 if testbed_index == _MAIN_TESTBED: 975 logging.info('Waiting for all testbeds to finish.') 976 start = time.time() 977 completed_testbeds = set() 978 while time.time() < start + MERGE_RESULTS_TIMEOUT: 979 for i in range(num_testbeds): 980 if os.path.isfile(f'testbed_{i}_completed.tmp'): 981 start = time.time() 982 completed_testbeds.add(i) 983 # Already reported _MAIN_TESTBED's results. 984 if len(completed_testbeds) == num_testbeds - 1: 985 logging.info('All testbeds completed, merging results.') 986 for parsed_id, parsed_camera, parsed_results in ( 987 parse_testbeds(completed_testbeds)): 988 logging.debug('Parsed id: %s, parsed cam: %s, parsed results: %s', 989 parsed_id, parsed_camera, parsed_results) 990 if not are_devices_similar(device_id, parsed_id): 991 logging.error('Device %s and device %s are not the same ' 992 'model/type/build/revision.', 993 device_id, parsed_id) 994 return 995 report_result(device_id, parsed_camera, parsed_results) 996 for temp_file in glob.glob('testbed_*.tmp'): 997 os.remove(temp_file) 998 break 999 else: 1000 logging.error('No testbeds finished in the last %d seconds, ' 1001 'but still expected data. ' 1002 'Completed testbed indices: %s, ' 1003 'expected number of testbeds: %d', 1004 MERGE_RESULTS_TIMEOUT, list(completed_testbeds), 1005 num_testbeds) 1006 else: 1007 with open(f'testbed_{testbed_index}_completed.tmp', 'w') as _: 1008 pass 1009 1010if __name__ == '__main__': 1011 main() 1012