1# Copyright 2014 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import glob 16import json 17import logging 18import os 19import os.path 20import re 21import subprocess 22import sys 23import tempfile 24import time 25import types 26 27import camera_properties_utils 28import capture_request_utils 29import image_processing_utils 30import its_device_utils 31import its_session_utils 32import lighting_control_utils 33import numpy as np 34import yaml 35 36 37YAML_FILE_DIR = os.environ['CAMERA_ITS_TOP'] 38CONFIG_FILE = os.path.join(YAML_FILE_DIR, 'config.yml') 39TEST_KEY_TABLET = 'tablet' 40TEST_KEY_SENSOR_FUSION = 'sensor_fusion' 41TEST_KEY_GEN2 = 'gen2' 42ACTIVITY_START_WAIT = 1.5 # seconds 43MERGE_RESULTS_TIMEOUT = 3600 # seconds 44 45NUM_TRIES = 2 46RESULT_PASS = 'PASS' 47RESULT_FAIL = 'FAIL' 48RESULT_NOT_EXECUTED = 'NOT_EXECUTED' 49RESULT_KEY = 'result' 50METRICS_KEY = 'mpc_metrics' 51PERFORMANCE_KEY = 'performance_metrics' 52FEATURE_QUERY_KEY = 'feature_query_proto' 53FEATURE_QUERY_PATH_KEY = 'feature_query_proto_path' 54SUMMARY_KEY = 'summary' 55RESULT_VALUES = (RESULT_PASS, RESULT_FAIL, RESULT_NOT_EXECUTED) 56CTS_VERIFIER_PACKAGE_NAME = 'com.android.cts.verifier' 57ACTION_ITS_RESULT = 'com.android.cts.verifier.camera.its.ACTION_ITS_RESULT' 58EXTRA_VERSION = 'camera.its.extra.VERSION' 59CURRENT_ITS_VERSION = '1.0' # version number to sync with CtsVerifier 60EXTRA_CAMERA_ID = 'camera.its.extra.CAMERA_ID' 61EXTRA_RESULTS = 'camera.its.extra.RESULTS' 62EXTRA_TABLET_NAME = 'camera.its.extra.TABLET_NAME' 63TIME_KEY_START = 'start' 64TIME_KEY_END = 'end' 65VALID_CONTROLLERS = ('arduino', 'canakit') 66_FRONT_CAMERA_ID = '1' 67# recover replaced '_' in scene def 68_INT_STR_DICT = types.MappingProxyType({'11': '1_1', '12': '1_2', '13': '1_3'}) 69_MAIN_TESTBED = 0 70_PROPERTIES_TO_MATCH = ( 71 'ro.product.model', 'ro.product.name', 'ro.build.display.id', 'ro.revision' 72) 73 74# Scenes that can be automated through tablet display 75# Notes on scene names: 76# scene*_1/2/... are same scene split to load balance run times for scenes 77# scene*_a/b/... are similar scenes that share one or more tests 78_TABLET_SCENES = ( 79 'scene0', 'scene1_1', 'scene1_2', 'scene1_3', 'scene2_a', 'scene2_b', 80 'scene2_c', 'scene2_d', 'scene2_e', 'scene2_f', 'scene2_g', 'scene3', 81 'scene4', 'scene6', 'scene7', 'scene8', 'scene9', 82 os.path.join('scene_extensions', 'scene_hdr'), 83 os.path.join('scene_extensions', 'scene_low_light'), 84 os.path.join('scene_tele', 'scene6_tele'), 85 os.path.join('scene_tele', 'scene7_tele'), 86 'scene_video', 87) 88 89# Scenes that use the 'sensor_fusion' test rig 90_MOTION_SCENES = ('sensor_fusion', 'feature_combination',) 91 92# Scenes that uses lighting control 93_FLASH_SCENES = ('scene_flash',) 94 95# Scenes that uses checkerboard as chart 96_CHECKERBOARD_SCENES = ('sensor_fusion', 'scene_flash', 'feature_combination',) 97 98# Scenes that have to be run manually regardless of configuration 99_MANUAL_SCENES = ('scene5',) 100 101# Scene extensions 102_EXTENSIONS_SCENES = (os.path.join('scene_extensions', 'scene_hdr'), 103 os.path.join('scene_extensions', 'scene_low_light'), 104 ) 105# Scene tele 106_TELE_SCENES = (os.path.join('scene_tele', 'scene6_tele'), 107 os.path.join('scene_tele', 'scene7_tele'), 108 ) 109_GEN2_RIG_SCENES = ('scene_ip',) 110# All possible scenes 111_ALL_SCENES = (_TABLET_SCENES + _MANUAL_SCENES + _MOTION_SCENES + 112 _FLASH_SCENES + _GEN2_RIG_SCENES) 113 114# Scenes that are logically grouped and can be called as group 115# scene6_tele is not grouped with scene6 because it requires extension rig 116_GROUPED_SCENES = types.MappingProxyType({ 117 'scene1': ('scene1_1', 'scene1_2', 'scene1_3'), 118 'scene2': ('scene2_a', 'scene2_b', 'scene2_c', 'scene2_d', 'scene2_e', 119 'scene2_f', 'scene2_g') 120}) 121 122# Scene requirements for manual testing. 123_SCENE_REQ = types.MappingProxyType({ 124 'scene0': None, 125 'scene1_1': 'A grey card covering at least the middle 30% of the scene', 126 'scene1_2': 'A grey card covering at least the middle 30% of the scene', 127 'scene1_3': 'A grey card covering at least the middle 30% of the scene, ' 128 'without a white border like scene1_1 or scene1_2', 129 'scene2_a': 'The picture with 3 faces in tests/scene2_a/scene2_a.png', 130 'scene2_b': 'The picture with 3 faces in tests/scene2_b/scene2_b.png', 131 'scene2_c': 'The picture with 3 faces in tests/scene2_c/scene2_c.png', 132 'scene2_d': 'The picture with 3 faces in tests/scene2_d/scene2_d.png', 133 'scene2_e': 'The picture with 3 faces in tests/scene2_e/scene2_e.png', 134 'scene2_f': 'The picture with 3 faces in tests/scene2_f/scene2_f.png', 135 'scene2_g': 'The picture with 3 profile faces in ' 136 'tests/scene2_g/scene2_g.png', 137 'scene3': 'The ISO12233 chart', 138 'scene4': 'A test chart of a circle covering at least the middle 50% of ' 139 'the scene. See tests/scene4/scene4.png', 140 'scene5': 'Capture images with a diffuser attached to the camera. See ' 141 'source.android.com/docs/compatibility/cts/camera-its-tests#scene5/diffuser ' # pylint: disable line-too-long 142 'for more details', 143 'scene6': 'A grid of ArUco markers on a white background. ' 144 'See tests/scene6/scene6.png', 145 'scene7': 'The picture with 4 different colors, slanted edge and' 146 '4 ArUco markers. See tests/scene7/scene7.png', 147 'scene8': 'The picture with 4 faces in 4 different colors overlay.' 148 'See tests/scene8/scene8.png', 149 'scene9': 'A scene with high entropy consisting of random size and colored ' 150 'circles. See tests/scene9/scene9.png', 151 # Use os.path to avoid confusion on other platforms 152 os.path.join('scene_extensions', 'scene_hdr'): ( 153 'A tablet displayed scene with a face on the left ' 154 'and a low-contrast QR code on the right. ' 155 'See tests/scene_extensions/scene_hdr/scene_hdr.png' 156 ), 157 os.path.join('scene_extensions', 'scene_low_light'): ( 158 'A tablet displayed scene with a grid of squares of varying ' 159 'brightness. See ' 160 'tests/scene_extensions/scene_low_light/scene_low_light.png' 161 ), 162 os.path.join('scene_tele', 'scene6_tele'): ( 163 'Identical to scene6, but for tele cameras. ' 164 'See tests/scene_tele/scene6_tele/scene6_tele.png' 165 ), 166 os.path.join('scene_tele', 'scene7_tele'): ( 167 'Identical to scene7, but for tele cameras. ' 168 'See tests/scene_tele/scene7_tele/scene7_tele.png' 169 ), 170 'sensor_fusion': 'A checkerboard pattern for phone to rotate in front of ' 171 'in tests/sensor_fusion/checkerboard.pdf\n' 172 'See tests/sensor_fusion/SensorFusion.pdf for detailed ' 173 'instructions.\nNote that this test will be skipped ' 174 'on devices not supporting REALTIME camera timestamp.', 175 'feature_combination': 'The same scene as sensor_fusion, ' 176 'separated for easier testing.', 177 'scene_flash': 'A checkerboard pattern chart with lights off.', 178 'scene_video': 'A tablet displayed scene with a series of circles moving ' 179 'at different simulated frame rates. ' 180 'See tests/scene_video/scene_video.mp4', 181 'scene_ip': 'A chart with features such as QR code, color checker chart, ' 182 'dead leaf patch, dynamic range chart used to analyze metrics ' 183 'such as brightness, sharpness, color accuracy.', 184}) 185 186# Made mutable to allow for test augmentation based on first API level 187SUB_CAMERA_TESTS = { 188 'scene0': ( 189 'test_jitter', 190 'test_metadata', 191 'test_request_capture_match', 192 'test_sensor_events', 193 'test_solid_color_test_pattern', 194 'test_unified_timestamps', 195 ), 196 'scene1_1': ( 197 'test_burst_capture', 198 'test_burst_sameness_manual', 199 'test_exposure_x_iso', 200 'test_linearity', 201 ), 202 'scene1_2': ( 203 'test_raw_exposure', 204 ), 205 'scene1_3': ( 206 'test_dng_noise_model', 207 'test_raw_sensitivity', 208 'test_yuv_plus_raw', 209 ), 210 'scene2_a': ( 211 'test_num_faces', 212 ), 213 'scene4': ( 214 'test_aspect_ratio_and_crop', 215 ), 216 'scene_tele/scene6_tele': ( 217 'test_zoom_tele', 218 'test_preview_zoom_tele', 219 ), 220 'scene_tele/scene7_tele': ( 221 'test_multi_camera_switch_tele', 222 ), 223 'scene_video': ( 224 'test_preview_frame_drop', 225 ), 226 'sensor_fusion': ( 227 'test_sensor_fusion', 228 ), 229} 230 231_LIGHTING_CONTROL_TESTS = ( 232 'test_auto_flash.py', 233 'test_preview_min_frame_rate.py', 234 'test_led_snapshot.py', 235 'test_night_extension.py', 236 'test_low_light_boost_extension.py', 237 'test_hdr_extension.py', 238 ) 239 240_EXTENSION_NAMES = ( 241 'hdr', 242 'low_light', 243) 244 245_DST_SCENE_DIR = '/sdcard/Download/' 246_DST_ROOT_DIR = '/sdcard/' 247_SUB_CAMERA_LEVELS = 2 248MOBLY_TEST_SUMMARY_TXT_FILE = 'test_mobly_summary.txt' 249 250 251def report_result(device_id, camera_id, tablet_name, results): 252 """Sends a pass/fail result to the device, via an intent. 253 254 Args: 255 device_id: The ID string of the device to report the results to. 256 camera_id: The ID string of the camera for which to report pass/fail. 257 tablet_name: The tablet name to identify model and build. 258 results: a dictionary contains all ITS scenes as key and result/summary of 259 current ITS run. See test_report_result unit test for an example. 260 """ 261 adb = f'adb -s {device_id}' 262 its_device_utils.start_its_test_activity(device_id) 263 time.sleep(ACTIVITY_START_WAIT) 264 265 # Validate/process results argument 266 for scene in results: 267 if RESULT_KEY not in results[scene]: 268 raise ValueError(f'ITS result not found for {scene}') 269 if results[scene][RESULT_KEY] not in RESULT_VALUES: 270 raise ValueError(f'Unknown ITS result for {scene}: {results[RESULT_KEY]}') 271 if SUMMARY_KEY in results[scene]: 272 device_summary_path = f'/sdcard/its_camera{camera_id}_{scene}.txt' 273 its_device_utils.run( 274 f'{adb} push {results[scene][SUMMARY_KEY]} {device_summary_path}') 275 results[scene][SUMMARY_KEY] = device_summary_path 276 if (FEATURE_QUERY_KEY in results[scene] and 277 FEATURE_QUERY_PATH_KEY in results[scene]): 278 for (proto_path, proto_file) in zip( 279 results[scene][FEATURE_QUERY_PATH_KEY], 280 results[scene][FEATURE_QUERY_KEY]): 281 host_path = os.path.join(proto_path, proto_file) 282 its_device_utils.run( 283 f'{adb} push {host_path} {_DST_ROOT_DIR}') 284 285 json_results = json.dumps(results) 286 cmd = (f"{adb} shell am broadcast -a {ACTION_ITS_RESULT} --es {EXTRA_VERSION}" 287 f" {CURRENT_ITS_VERSION} --es {EXTRA_CAMERA_ID} {camera_id} --es " 288 f"{EXTRA_TABLET_NAME} {tablet_name} --es " 289 f"{EXTRA_RESULTS} \'{json_results}\'") 290 its_device_utils.run(cmd) 291 292 293def write_result(testbed_index, device_id, camera_id, results): 294 """Writes results to a temporary file for merging. 295 296 Args: 297 testbed_index: the index of a finished testbed. 298 device_id: the ID string of the device that created results. 299 camera_id: the ID string of the camera of the device. 300 results: a dictionary that contains all ITS scenes as key 301 and result/summary of current ITS run. 302 """ 303 result = {'device_id': device_id, 'results': results} 304 file_name = f'testbed_{testbed_index}_camera_{camera_id}.tmp' 305 with open(file_name, 'w') as f: 306 json.dump(result, f) 307 308 309def parse_testbeds(completed_testbeds): 310 """Parses completed testbeds and yields device_id, camera_id, and results. 311 312 Args: 313 completed_testbeds: an iterable of completed testbed indices. 314 Yields: 315 device_id: the device associated with the testbed. 316 camera_id: one of the camera_ids associated with the testbed. 317 results: the dictionary with scenes and result/summary of testbed's run. 318 """ 319 for i in completed_testbeds: 320 for file_name in glob.glob(f'testbed_{i}_camera_*.tmp'): 321 camera_id = file_name.split('camera_')[1].split('.tmp')[0] 322 device_id = '' 323 results = {} 324 with open(file_name, 'r') as f: 325 testbed_data = json.load(f) 326 device_id = testbed_data['device_id'] 327 results = testbed_data['results'] 328 if not device_id or not results: 329 raise ValueError(f'device_id or results for {file_name} not found.') 330 yield device_id, camera_id, results 331 332 333def get_device_property(device_id, property_name): 334 """Get property of a given device. 335 336 Args: 337 device_id: the ID string of a device. 338 property_name: the desired property string. 339 Returns: 340 The value of the property. 341 """ 342 property_cmd = f'adb -s {device_id} shell getprop {property_name}' 343 raw_output = subprocess.check_output( 344 property_cmd, stderr=subprocess.STDOUT, shell=True) 345 return str(raw_output.decode('utf-8')).strip() 346 347 348def are_devices_similar(device_id_1, device_id_2): 349 """Checks if key dimensions are the same between devices. 350 351 Args: 352 device_id_1: the ID string of the _MAIN_TESTBED device. 353 device_id_2: the ID string of another device. 354 Returns: 355 True if both devices share key dimensions. 356 """ 357 for property_to_match in _PROPERTIES_TO_MATCH: 358 property_value_1 = get_device_property(device_id_1, property_to_match) 359 property_value_2 = get_device_property(device_id_2, property_to_match) 360 if property_value_1 != property_value_2: 361 logging.error('%s does not match %s for %s', 362 property_value_1, property_value_2, property_to_match) 363 return False 364 return True 365 366 367def check_manual_scenes(device_id, camera_id, scene, out_path): 368 """Halt run to change scenes. 369 370 Args: 371 device_id: id of device 372 camera_id: id of camera 373 scene: Name of the scene to copy image files. 374 out_path: output file location 375 """ 376 hidden_physical_id = None 377 if its_session_utils.SUB_CAMERA_SEPARATOR in camera_id: 378 split_camera_ids = camera_id.split(its_session_utils.SUB_CAMERA_SEPARATOR) 379 if len(split_camera_ids) == _SUB_CAMERA_LEVELS: 380 camera_id = split_camera_ids[0] 381 hidden_physical_id = split_camera_ids[1] 382 383 with its_session_utils.ItsSession( 384 device_id=device_id, 385 camera_id=camera_id, 386 hidden_physical_id=hidden_physical_id) as cam: 387 props = cam.get_camera_properties() 388 props = cam.override_with_hidden_physical_camera_props(props) 389 390 while True: 391 input(f'\n Press <ENTER> after positioning camera {camera_id} with ' 392 f'{scene}.\n The scene setup should be: \n {_SCENE_REQ[scene]}\n') 393 # Converge 3A prior to capture 394 if scene == 'scene5': 395 cam.do_3a(do_af=False, lock_ae=camera_properties_utils.ae_lock(props), 396 lock_awb=camera_properties_utils.awb_lock(props)) 397 else: 398 cam.do_3a() 399 req, fmt = capture_request_utils.get_fastest_auto_capture_settings(props) 400 logging.info('Capturing an image to check the test scene') 401 cap = cam.do_capture(req, fmt) 402 img = image_processing_utils.convert_capture_to_rgb_image(cap) 403 img_name = os.path.join(out_path, f'test_{scene.replace("/", "_")}.jpg') 404 logging.info('Please check scene setup in %s', img_name) 405 image_processing_utils.write_image(img, img_name) 406 choice = input(f'Is the image okay for ITS {scene}? (Y/N)').lower() 407 if choice == 'y': 408 break 409 410 411def get_config_file_contents(): 412 """Read the config file contents from a YML file. 413 414 Args: 415 None 416 417 Returns: 418 config_file_contents: a dict read from config.yml 419 """ 420 with open(CONFIG_FILE) as file: 421 config_file_contents = yaml.safe_load(file) 422 return config_file_contents 423 424 425def get_test_params(config_file_contents): 426 """Reads the config file parameters. 427 428 Args: 429 config_file_contents: dict read from config.yml file 430 431 Returns: 432 dict of test parameters 433 """ 434 test_params = None 435 for _, j in config_file_contents.items(): 436 for datadict in j: 437 test_params = datadict.get('TestParams') 438 return test_params 439 440 441def get_device_serial_number(device, config_file_contents): 442 """Returns the serial number of the device with label from the config file. 443 444 The config file contains TestBeds dictionary which contains Controllers and 445 Android Device dicts.The two devices used by the test per box are listed 446 here labels dut and tablet. Parse through the nested TestBeds dict to get 447 the Android device details. 448 449 Args: 450 device: String device label as specified in config file.dut/tablet 451 config_file_contents: dict read from config.yml file 452 """ 453 454 for _, j in config_file_contents.items(): 455 for datadict in j: 456 android_device_contents = datadict.get('Controllers') 457 for device_dict in android_device_contents.get('AndroidDevice'): 458 for _, label in device_dict.items(): 459 if label == 'tablet': 460 tablet_device_id = str(device_dict.get('serial')) 461 if label == 'dut': 462 dut_device_id = str(device_dict.get('serial')) 463 if device == 'tablet': 464 return tablet_device_id 465 else: 466 return dut_device_id 467 468 469def get_updated_yml_file(yml_file_contents): 470 """Create a new yml file and write the testbed contents in it. 471 472 This testbed file is per box and contains all the parameters and 473 device id used by the mobly tests. 474 475 Args: 476 yml_file_contents: Data to write in yml file. 477 478 Returns: 479 Updated yml file contents. 480 """ 481 os.chmod(YAML_FILE_DIR, 0o755) 482 file_descriptor, new_yaml_file = tempfile.mkstemp( 483 suffix='.yml', prefix='config_', dir=YAML_FILE_DIR) 484 os.close(file_descriptor) 485 with open(new_yaml_file, 'w') as f: 486 yaml.dump(yml_file_contents, stream=f, default_flow_style=False) 487 new_yaml_file_name = os.path.basename(new_yaml_file) 488 return new_yaml_file_name 489 490 491def enable_external_storage(device_id): 492 """Override apk mode to allow write to external storage. 493 494 Args: 495 device_id: Serial number of the device. 496 497 """ 498 cmd = (f'adb -s {device_id} shell appops ' 499 'set com.android.cts.verifier MANAGE_EXTERNAL_STORAGE allow') 500 its_device_utils.run(cmd) 501 502 503def get_available_cameras(device_id, camera_id): 504 """Get available camera devices in the current state. 505 506 Args: 507 device_id: Serial number of the device. 508 camera_id: Logical camera_id 509 510 Returns: 511 List of all the available camera_ids. 512 """ 513 with its_session_utils.ItsSession( 514 device_id=device_id, 515 camera_id=camera_id) as cam: 516 props = cam.get_camera_properties() 517 props = cam.override_with_hidden_physical_camera_props(props) 518 unavailable_physical_cameras = cam.get_unavailable_physical_cameras( 519 camera_id) 520 unavailable_physical_ids = unavailable_physical_cameras[ 521 'unavailablePhysicalCamerasArray'] 522 output = cam.get_camera_ids() 523 all_camera_ids = output['cameraIdArray'] 524 # Concat camera_id, physical camera_id and sub camera separator 525 unavailable_physical_ids = [f'{camera_id}.{s}' 526 for s in unavailable_physical_ids] 527 for i in unavailable_physical_ids: 528 if i in all_camera_ids: 529 all_camera_ids.remove(i) 530 logging.debug('available camera ids: %s', all_camera_ids) 531 return all_camera_ids 532 533 534def get_unavailable_physical_cameras(device_id, camera_id): 535 """Get unavailable physical cameras in the current state. 536 537 Args: 538 device_id: Serial number of the device. 539 camera_id: Logical camera device id 540 541 Returns: 542 List of all the unavailable camera_ids. 543 """ 544 with its_session_utils.ItsSession( 545 device_id=device_id, 546 camera_id=camera_id) as cam: 547 unavailable_physical_cameras = cam.get_unavailable_physical_cameras( 548 camera_id) 549 unavailable_physical_ids = unavailable_physical_cameras[ 550 'unavailablePhysicalCamerasArray'] 551 unavailable_physical_ids = [f'{camera_id}.{s}' 552 for s in unavailable_physical_ids] 553 logging.debug('Unavailable physical camera ids: %s', 554 unavailable_physical_ids) 555 return unavailable_physical_ids 556 557 558def is_device_folded(device_id): 559 """Returns True if the foldable device is in folded state. 560 561 Args: 562 device_id: Serial number of the foldable device. 563 """ 564 cmd = (f'adb -s {device_id} shell cmd device_state state') 565 result = subprocess.getoutput(cmd) 566 if 'CLOSE' in result: 567 return True 568 return False 569 570 571def augment_sub_camera_tests(first_api_level): 572 """Adds certain tests to SUB_CAMERA_TESTS depending on first_api_level. 573 574 Args: 575 first_api_level: First api level of the device. 576 """ 577 if (first_api_level >= its_session_utils.ANDROID15_API_LEVEL): 578 logging.debug('Augmenting sub camera tests') 579 SUB_CAMERA_TESTS['scene6'] = ('test_in_sensor_zoom',) 580 581 582def main(): 583 """Run all the Camera ITS automated tests. 584 585 Script should be run from the top-level CameraITS directory. 586 587 Command line arguments: 588 camera: the camera(s) to be tested. Use comma to separate multiple 589 camera Ids. Ex: "camera=0,1" or "camera=1" 590 scenes: the test scene(s) to be executed. Use comma to separate 591 multiple scenes. Ex: "scenes=scene0,scene1_1" or 592 "scenes=0,1_1,sensor_fusion" (sceneX can be abbreviated by X 593 where X is scene name minus 'scene') 594 """ 595 logging.basicConfig(level=logging.INFO) 596 # Make output directories to hold the generated files. 597 topdir = tempfile.mkdtemp(prefix='CameraITS_') 598 try: 599 subprocess.call(['chmod', 'g+rx', topdir]) 600 except OSError as e: 601 logging.info(repr(e)) 602 603 scenes = [] 604 camera_id_combos = [] 605 testbed_index = None 606 num_testbeds = None 607 608 # Override camera, scenes and testbed with cmd line values if available 609 for s in list(sys.argv[1:]): 610 if 'scenes=' in s: 611 scenes = s.split('scenes=')[1].split(',') 612 elif 'camera=' in s: 613 camera_id_combos = s.split('=')[1].split(',') 614 elif 'testbed_index=' in s: 615 testbed_index = int(s.split('=')[1]) 616 elif 'num_testbeds=' in s: 617 num_testbeds = int(s.split('=')[1]) 618 else: 619 raise ValueError(f'Unknown argument {s}') 620 if testbed_index is None and num_testbeds is not None: 621 raise ValueError( 622 'testbed_index must be specified if num_testbeds is specified.') 623 if (testbed_index is not None and num_testbeds is not None and 624 testbed_index >= num_testbeds): 625 raise ValueError('testbed_index must be less than num_testbeds. ' 626 'testbed_index starts at 0.') 627 628 # Prepend 'scene' if not specified at cmd line 629 for i, s in enumerate(scenes): 630 if (not s.startswith('scene') and 631 not s.startswith(('checkerboard', 'sensor_fusion', 632 'flash', 'feature_combination', '<scene-name>'))): 633 scenes[i] = f'scene{s}' 634 if (s.startswith('flash') or s.startswith('extensions') 635 or s.startswith('ip')): 636 scenes[i] = f'scene_{s}' 637 # Handle scene_extensions 638 if any(s.startswith(extension) for extension in _EXTENSION_NAMES): 639 scenes[i] = f'scene_extensions/scene_{s}' 640 if (any(s.startswith('scene_' + extension) 641 for extension in _EXTENSION_NAMES)): 642 scenes[i] = f'scene_extensions/{s}' 643 # Handle scene_tele 644 if s == 'scene6_tele' or s == 'scene7_tele': 645 scenes[i] = f'scene_tele/{s}' 646 647 # Read config file and extract relevant TestBed 648 config_file_contents = get_config_file_contents() 649 if testbed_index is None: 650 testbed_to_remove = [] 651 for i in config_file_contents['TestBeds']: 652 name = i['Name'].lower() 653 if scenes == ['scene_ip']: 654 if TEST_KEY_GEN2 not in name: 655 testbed_to_remove.append(i) 656 elif set(scenes).intersection( 657 set(_CHECKERBOARD_SCENES)) or scenes == ['checkerboard']: 658 if TEST_KEY_SENSOR_FUSION not in name: 659 testbed_to_remove.append(i) 660 else: 661 if TEST_KEY_SENSOR_FUSION in name or TEST_KEY_GEN2 in name: 662 testbed_to_remove.append(i) 663 for i in testbed_to_remove: 664 config_file_contents['TestBeds'].remove(i) 665 else: 666 config_file_contents = { 667 'TestBeds': [config_file_contents['TestBeds'][testbed_index]] 668 } 669 670 # Get test parameters from config file 671 test_params_content = get_test_params(config_file_contents) 672 if not camera_id_combos: 673 camera_id_combos = str(test_params_content['camera']).split(',') 674 if not scenes: 675 scenes = str(test_params_content['scene']).split(',') 676 scenes = [_INT_STR_DICT.get(n, n) for n in scenes] # recover '1_1' & '1_2' 677 678 device_id = get_device_serial_number('dut', config_file_contents) 679 # Enable external storage on DUT to send summary report to CtsVerifier.apk 680 enable_external_storage(device_id) 681 682 # Add to SUB_CAMERA_TESTS depending on first_api_level 683 augment_sub_camera_tests(its_session_utils.get_first_api_level(device_id)) 684 685 # Verify that CTS Verifier is installed 686 its_session_utils.check_apk_installed(device_id, CTS_VERIFIER_PACKAGE_NAME) 687 # Check whether the dut is foldable or not 688 testing_foldable_device = True if test_params_content[ 689 'foldable_device'] == 'True' else False 690 available_camera_ids_to_test_foldable = [] 691 if testing_foldable_device: 692 logging.debug('Testing foldable device.') 693 # Check the state of foldable device. True if device is folded, 694 # false if the device is opened. 695 device_folded = is_device_folded(device_id) 696 # list of available camera_ids to be tested in device state 697 available_camera_ids_to_test_foldable = get_available_cameras( 698 device_id, _FRONT_CAMERA_ID) 699 700 config_file_test_key = config_file_contents['TestBeds'][0]['Name'].lower() 701 logging.info('Saving %s output files to: %s', config_file_test_key, topdir) 702 if TEST_KEY_TABLET in config_file_test_key: 703 tablet_id = get_device_serial_number('tablet', config_file_contents) 704 tablet_name_cmd = f'adb -s {tablet_id} shell getprop ro.product.device' 705 raw_output = subprocess.check_output( 706 tablet_name_cmd, stderr=subprocess.STDOUT, shell=True) 707 tablet_name = str(raw_output.decode('utf-8')).strip() 708 logging.debug('Tablet name: %s', tablet_name) 709 brightness = test_params_content['brightness'] 710 its_session_utils.validate_tablet(tablet_name, brightness, tablet_id) 711 elif TEST_KEY_GEN2 in config_file_test_key: 712 tablet_id = None 713 tablet_name = 'ip_chart' 714 else: 715 tablet_id = None 716 tablet_name = 'sensor_fusion' 717 718 testing_sensor_fusion_with_controller = False 719 if TEST_KEY_SENSOR_FUSION in config_file_test_key: 720 if test_params_content['rotator_cntl'].lower() in VALID_CONTROLLERS: 721 testing_sensor_fusion_with_controller = True 722 723 testing_flash_with_controller = False 724 if (test_params_content.get('lighting_cntl', 'None').lower() == 'arduino' and 725 'manual' not in config_file_test_key): 726 testing_flash_with_controller = True 727 728 # Expand GROUPED_SCENES and remove any duplicates 729 scenes = [_GROUPED_SCENES[s] if s in _GROUPED_SCENES else s for s in scenes] 730 scenes = np.hstack(scenes).tolist() 731 scenes = sorted(set(scenes), key=scenes.index) 732 # List of scenes to be executed in folded state will have '_folded' 733 # prefix. This will help distinguish the test results from folded vs 734 # open device state for front camera_ids. 735 folded_device_scenes = [] 736 for scene in scenes: 737 folded_device_scenes.append(f'{scene}_folded') 738 739 logging.info('Running ITS on device: %s, camera(s): %s, scene(s): %s', 740 device_id, camera_id_combos, scenes) 741 742 # Determine if manual run 743 if tablet_id is not None and not set(scenes).intersection(_MANUAL_SCENES): 744 auto_scene_switch = True 745 else: 746 auto_scene_switch = False 747 logging.info('Manual, checkerboard scenes, scene5 or scene_ip testing.') 748 749 folded_prompted = False 750 opened_prompted = False 751 for camera_id in camera_id_combos: 752 test_params_content['camera'] = camera_id 753 results = {} 754 unav_cameras = [] 755 # Get the list of unavailable cameras in current device state. 756 # These camera_ids should not be tested in current device state. 757 if testing_foldable_device: 758 unav_cameras = get_unavailable_physical_cameras( 759 device_id, _FRONT_CAMERA_ID) 760 761 if testing_foldable_device: 762 device_state = 'folded' if device_folded else 'opened' 763 764 testing_folded_front_camera = (testing_foldable_device and 765 device_folded and 766 _FRONT_CAMERA_ID in camera_id) 767 768 # Raise an assertion error if there is any camera unavailable in 769 # current device state. Usually scenes with suffix 'folded' will 770 # be executed in folded state. 771 if (testing_foldable_device and 772 _FRONT_CAMERA_ID in camera_id and camera_id in unav_cameras): 773 raise AssertionError( 774 f'Camera {camera_id} is unavailable in device state {device_state}' 775 f' and cannot be tested with device {device_state}!') 776 777 if (testing_folded_front_camera and camera_id not in unav_cameras 778 and not folded_prompted): 779 folded_prompted = True 780 input('\nYou are testing a foldable device in folded state. ' 781 'Please make sure the device is folded and press <ENTER> ' 782 'after positioning properly.\n') 783 784 if (testing_foldable_device and 785 not device_folded and _FRONT_CAMERA_ID in camera_id and 786 camera_id not in unav_cameras and not opened_prompted): 787 opened_prompted = True 788 input('\nYou are testing a foldable device in opened state. ' 789 'Please make sure the device is unfolded and press <ENTER> ' 790 'after positioning properly.\n') 791 792 # Run through all scenes if user does not supply one and config file doesn't 793 # have specific scene name listed. 794 if TEST_KEY_SENSOR_FUSION in config_file_test_key: 795 possible_scenes = _CHECKERBOARD_SCENES 796 elif its_session_utils.SUB_CAMERA_SEPARATOR in camera_id: 797 possible_scenes = list(SUB_CAMERA_TESTS.keys()) 798 if auto_scene_switch: 799 possible_scenes.remove('sensor_fusion') 800 if 'scene_tele' in scenes: 801 possible_scenes = _TELE_SCENES 802 else: 803 if 'scene_extensions' in scenes: 804 possible_scenes = _EXTENSIONS_SCENES 805 elif 'scene_tele' in scenes: 806 possible_scenes = _TELE_SCENES 807 elif 'scene_ip' in scenes: 808 possible_scenes = _GEN2_RIG_SCENES 809 else: 810 possible_scenes = _TABLET_SCENES if auto_scene_switch else _ALL_SCENES 811 if ('<scene-name>' in scenes or 'checkerboard' in scenes or 812 'scene_extensions' in scenes or 'scene_tele' in scenes 813 or 'scene_ip' in scenes): 814 per_camera_scenes = possible_scenes 815 else: 816 # Validate user input scene names 817 per_camera_scenes = [] 818 for s in scenes: 819 if s in possible_scenes: 820 per_camera_scenes.append(s) 821 if not per_camera_scenes: 822 raise ValueError('No valid scene specified for this camera.') 823 824 # Folded state scenes will have 'folded' suffix only for 825 # front cameras since rear cameras are common in both folded 826 # and unfolded state. 827 foldable_per_camera_scenes = [] 828 if testing_folded_front_camera: 829 if camera_id not in available_camera_ids_to_test_foldable: 830 raise AssertionError(f'camera {camera_id} is not available.') 831 for s in per_camera_scenes: 832 foldable_per_camera_scenes.append(f'{s}_folded') 833 834 if foldable_per_camera_scenes: 835 per_camera_scenes = foldable_per_camera_scenes 836 837 logging.info('camera: %s, scene(s): %s', camera_id, per_camera_scenes) 838 839 if testing_folded_front_camera: 840 all_scenes = [f'{s}_folded' for s in _ALL_SCENES] 841 else: 842 all_scenes = _ALL_SCENES 843 844 for s in all_scenes: 845 results[s] = {RESULT_KEY: RESULT_NOT_EXECUTED} 846 847 # assert device folded testing scenes with suffix 'folded' 848 if testing_foldable_device and 'folded' in s: 849 if not device_folded: 850 raise AssertionError('Device should be folded during' 851 ' testing scenes with suffix "folded"') 852 853 # A subdir in topdir will be created for each camera_id. All scene test 854 # output logs for each camera id will be stored in this subdir. 855 # This output log path is a mobly param : LogPath 856 camera_id_str = ( 857 camera_id.replace(its_session_utils.SUB_CAMERA_SEPARATOR, '_') 858 ) 859 mobly_output_logs_path = os.path.join(topdir, f'cam_id_{camera_id_str}') 860 os.mkdir(mobly_output_logs_path) 861 tot_pass = 0 862 for s in per_camera_scenes: 863 results[s]['TEST_STATUS'] = [] 864 results[s][METRICS_KEY] = [] 865 results[s][PERFORMANCE_KEY] = [] 866 results[s][FEATURE_QUERY_KEY] = [] 867 results[s][FEATURE_QUERY_PATH_KEY] = [] 868 869 # unit is millisecond for execution time record in CtsVerifier 870 scene_start_time = int(round(time.time() * 1000)) 871 scene_test_summary = f'Cam{camera_id} {s}' + '\n' 872 mobly_scene_output_logs_path = os.path.join(mobly_output_logs_path, s) 873 874 # Since test directories do not have 'folded' in the name, we need 875 # to remove that suffix for the path of the scenes to be loaded 876 # on the tablets 877 testing_scene = s 878 if 'folded' in s: 879 testing_scene = s.split('_folded')[0] 880 test_params_content['scene'] = testing_scene 881 test_params_content['scene_with_suffix'] = s 882 883 if auto_scene_switch: 884 # Copy scene images onto the tablet 885 if 'scene0' not in testing_scene: 886 its_session_utils.copy_scenes_to_tablet(testing_scene, tablet_id) 887 else: 888 # Check manual scenes for correctness 889 if ('scene0' not in testing_scene and 890 'scene_ip' not in testing_scene and 891 not testing_sensor_fusion_with_controller): 892 check_manual_scenes(device_id, camera_id, testing_scene, 893 mobly_output_logs_path) 894 895 scene_test_list = [] 896 config_file_contents['TestBeds'][0]['TestParams'] = test_params_content 897 # Add the MoblyParams to config.yml file with the path to store camera_id 898 # test results. This is a separate dict other than TestBeds. 899 mobly_params_dict = { 900 'MoblyParams': { 901 'LogPath': mobly_scene_output_logs_path 902 } 903 } 904 config_file_contents.update(mobly_params_dict) 905 logging.debug('Final config file contents: %s', config_file_contents) 906 new_yml_file_name = get_updated_yml_file(config_file_contents) 907 logging.info('Using %s as temporary config yml file', new_yml_file_name) 908 if camera_id.rfind(its_session_utils.SUB_CAMERA_SEPARATOR) == -1: 909 scene_dir = os.listdir( 910 os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', testing_scene)) 911 for file_name in scene_dir: 912 if file_name.endswith('.py') and 'test' in file_name: 913 scene_test_list.append(file_name) 914 else: # sub-camera 915 if SUB_CAMERA_TESTS.get(testing_scene): 916 scene_test_list = [f'{test}.py' for test in SUB_CAMERA_TESTS[ 917 testing_scene]] 918 else: 919 scene_test_list = [] 920 scene_test_list.sort() 921 922 # Run tests for scene 923 logging.info('Running tests for %s with camera %s', 924 testing_scene, camera_id) 925 num_pass = 0 926 num_skip = 0 927 num_not_mandated_fail = 0 928 num_fail = 0 929 for test in scene_test_list: 930 # Handle repeated test 931 if 'tests/' in test: 932 cmd = [ 933 'python3', 934 os.path.join(os.environ['CAMERA_ITS_TOP'], test), '-c', 935 f'{new_yml_file_name}' 936 ] 937 else: 938 cmd = [ 939 'python3', 940 os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', 941 testing_scene, test), 942 '-c', 943 f'{new_yml_file_name}' 944 ] 945 return_string = '' 946 for num_try in range(NUM_TRIES): 947 # Saves to mobly test summary file 948 # print only messages for manual lighting control testing 949 output = subprocess.Popen( 950 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT 951 ) 952 with output.stdout, open( 953 os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE), 'wb' 954 ) as file: 955 for line in iter(output.stdout.readline, b''): 956 out = line.decode('utf-8').strip() 957 if '<ENTER>' in out: print(out) 958 file.write(line) 959 output.wait() 960 961 # Parse mobly logs to determine PASS/FAIL(*)/SKIP & socket FAILs 962 with open( 963 os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE), 'r') as file: 964 test_code = output.returncode 965 test_skipped = False 966 test_not_yet_mandated = False 967 test_mpc_req = '' 968 perf_test_metrics = '' 969 hdr_mpc_req = '' 970 feature_query_proto = '' 971 content = file.read() 972 root_output_path = '' 973 974 # Find media performance class logging 975 lines = content.splitlines() 976 for one_line in lines: 977 # regular expression pattern must match 978 # MPC12_CAMERA_LAUNCH_PATTERN or MPC12_JPEG_CAPTURE_PATTERN in 979 # ItsTestActivity.java. 980 mpc_string_match = re.search( 981 '^(1080p_jpeg_capture_time_ms:|camera_launch_time_ms:)', 982 one_line) 983 if mpc_string_match: 984 test_mpc_req = one_line 985 break 986 987 for one_line in lines: 988 # regular expression pattern must match in ItsTestActivity.java. 989 gainmap_string_match = re.search('^has_gainmap:', one_line) 990 if gainmap_string_match: 991 hdr_mpc_req = one_line 992 break 993 994 # Find root output path used to store feature combination query 995 # proto 996 for one_line in lines: 997 if 'root_output_path:' in one_line: 998 root_output_path = one_line.split(':')[1].strip() 999 # Find feature combination query proto 1000 for one_line in lines: 1001 # regular expression pattern must match in ItsTestActivity.java. 1002 feature_comb_query_string_match = re.search( 1003 '^feature_query_proto:(.*)', one_line 1004 ) 1005 if feature_comb_query_string_match: 1006 feature_query_proto = feature_comb_query_string_match.group(1) 1007 break 1008 1009 # Find performance metrics logging 1010 for one_line in lines: 1011 # regular expression pattern must match in ItsTestActivity.java. 1012 perf_metrics_string_match = re.search( 1013 '^test.*:', 1014 one_line) 1015 if perf_metrics_string_match: 1016 perf_test_metrics = one_line 1017 # each test can add multiple metrics 1018 results[s][PERFORMANCE_KEY].append(perf_test_metrics) 1019 1020 if 'Test skipped' in content: 1021 return_string = 'SKIP ' 1022 num_skip += 1 1023 test_skipped = True 1024 break 1025 1026 if its_session_utils.NOT_YET_MANDATED_MESSAGE in content: 1027 return_string = 'FAIL*' 1028 num_not_mandated_fail += 1 1029 test_not_yet_mandated = True 1030 break 1031 1032 if test_code == 0 and not test_skipped: 1033 return_string = 'PASS ' 1034 num_pass += 1 1035 break 1036 1037 if test_code != 0 and not test_not_yet_mandated: 1038 return_string = 'FAIL ' 1039 if 'Problem with socket' in content and num_try != NUM_TRIES-1: 1040 logging.info('Retry %s/%s', s, test) 1041 else: 1042 num_fail += 1 1043 break 1044 os.remove(os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE)) 1045 status_prefix = '' 1046 if testbed_index is not None: 1047 status_prefix = config_file_test_key + ':' 1048 logging.info('%s%s %s/%s', status_prefix, return_string, s, test) 1049 test_name = test.split('/')[-1].split('.')[0] 1050 results[s]['TEST_STATUS'].append({ 1051 'test': test_name, 1052 'status': return_string.strip()}) 1053 if test_mpc_req: 1054 results[s][METRICS_KEY].append(test_mpc_req) 1055 if hdr_mpc_req: 1056 results[s][METRICS_KEY].append(hdr_mpc_req) 1057 if feature_query_proto and root_output_path: 1058 results[s][FEATURE_QUERY_KEY].append(feature_query_proto) 1059 results[s][FEATURE_QUERY_PATH_KEY].append(root_output_path) 1060 1061 msg_short = f'{return_string} {test}' 1062 scene_test_summary += msg_short + '\n' 1063 if (test in _LIGHTING_CONTROL_TESTS and 1064 not testing_flash_with_controller): 1065 print('Turn lights ON in rig and press <ENTER> to continue.') 1066 1067 # unit is millisecond for execution time record in CtsVerifier 1068 scene_end_time = int(round(time.time() * 1000)) 1069 skip_string = '' 1070 tot_tests = len(scene_test_list) 1071 tot_tests_run = tot_tests - num_skip 1072 if tot_tests_run != 0: 1073 tests_passed_ratio = (num_pass + num_not_mandated_fail) / tot_tests_run 1074 else: 1075 tests_passed_ratio = (num_pass + num_not_mandated_fail) / 100.0 1076 tests_passed_ratio_format = f'{(100 * tests_passed_ratio):.1f}%' 1077 if num_skip > 0: 1078 skip_string = f",{num_skip} test{'s' if num_skip > 1 else ''} skipped" 1079 test_result = (f'{num_pass + num_not_mandated_fail} / {tot_tests_run} ' 1080 f'tests passed ({tests_passed_ratio_format}){skip_string}') 1081 logging.info(test_result) 1082 if num_not_mandated_fail > 0: 1083 logging.info('(*) %s not_yet_mandated tests failed', 1084 num_not_mandated_fail) 1085 1086 tot_pass += num_pass 1087 logging.info('scene tests: %s, Total tests passed: %s', tot_tests, 1088 tot_pass) 1089 if tot_tests > 0: 1090 logging.info('%s compatibility score: %.f/100\n', 1091 s, 100 * num_pass / tot_tests) 1092 scene_test_summary_path = os.path.join(mobly_scene_output_logs_path, 1093 'scene_test_summary.txt') 1094 with open(scene_test_summary_path, 'w') as f: 1095 f.write(scene_test_summary) 1096 results[s][RESULT_KEY] = (RESULT_PASS if num_fail == 0 else RESULT_FAIL) 1097 results[s][SUMMARY_KEY] = scene_test_summary_path 1098 results[s][TIME_KEY_START] = scene_start_time 1099 results[s][TIME_KEY_END] = scene_end_time 1100 else: 1101 logging.info('%s compatibility score: 0/100\n') 1102 1103 # Delete temporary yml file after scene run. 1104 new_yaml_file_path = os.path.join(YAML_FILE_DIR, new_yml_file_name) 1105 os.remove(new_yaml_file_path) 1106 # Log results per camera 1107 if num_testbeds is None or testbed_index == _MAIN_TESTBED: 1108 logging.info('Reporting camera %s ITS results to CtsVerifier', camera_id) 1109 logging.info('ITS results to CtsVerifier: %s', results) 1110 report_result(device_id, camera_id, tablet_name, results) 1111 else: 1112 write_result(testbed_index, device_id, camera_id, results) 1113 1114 logging.info('Test execution completed.') 1115 1116 # Power down tablet 1117 if tablet_id: 1118 cmd = f'adb -s {tablet_id} shell input keyevent KEYCODE_POWER' 1119 subprocess.Popen(cmd.split()) 1120 1121 # establish connection with lighting controller 1122 lighting_cntl = test_params_content.get('lighting_cntl', 'None') 1123 lighting_ch = test_params_content.get('lighting_ch', 'None') 1124 arduino_serial_port = lighting_control_utils.lighting_control( 1125 lighting_cntl, lighting_ch) 1126 1127 # turn OFF lights 1128 lighting_control_utils.set_lighting_state( 1129 arduino_serial_port, lighting_ch, 'OFF') 1130 1131 if num_testbeds is not None: 1132 if testbed_index == _MAIN_TESTBED: 1133 logging.info('Waiting for all testbeds to finish.') 1134 start = time.time() 1135 completed_testbeds = set() 1136 while time.time() < start + MERGE_RESULTS_TIMEOUT: 1137 for i in range(num_testbeds): 1138 if os.path.isfile(f'testbed_{i}_completed.tmp'): 1139 start = time.time() 1140 completed_testbeds.add(i) 1141 # Already reported _MAIN_TESTBED's results. 1142 if len(completed_testbeds) == num_testbeds - 1: 1143 logging.info('All testbeds completed, merging results.') 1144 for parsed_id, parsed_camera, parsed_results in ( 1145 parse_testbeds(completed_testbeds)): 1146 logging.debug('Parsed id: %s, parsed cam: %s, parsed results: %s', 1147 parsed_id, parsed_camera, parsed_results) 1148 if not are_devices_similar(device_id, parsed_id): 1149 logging.error('Device %s and device %s are not the same ' 1150 'model/type/build/revision.', 1151 device_id, parsed_id) 1152 return 1153 report_result(device_id, parsed_camera, tablet_name, parsed_results) 1154 for temp_file in glob.glob('testbed_*.tmp'): 1155 os.remove(temp_file) 1156 break 1157 else: 1158 logging.error('No testbeds finished in the last %d seconds, ' 1159 'but still expected data. ' 1160 'Completed testbed indices: %s, ' 1161 'expected number of testbeds: %d', 1162 MERGE_RESULTS_TIMEOUT, list(completed_testbeds), 1163 num_testbeds) 1164 else: 1165 with open(f'testbed_{testbed_index}_completed.tmp', 'w') as _: 1166 pass 1167 1168if __name__ == '__main__': 1169 main() 1170