1# Copyright 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Utility functions for verifying preview stabilization. 15""" 16 17import cv2 18import logging 19import os 20import threading 21import time 22 23import camera_properties_utils 24import its_session_utils 25import sensor_fusion_utils 26import video_processing_utils 27 28_AREA_720P_VIDEO = 1280 * 720 29_ASPECT_RATIO_16_9 = 16/9 # determine if preview fmt > 16:9 30_ASPECT_TOL = 0.01 31_GREEN_TOL = 200 # 200 out of 255 Green value in RGB 32_GREEN_PERCENT = 95 33_HIGH_RES_SIZE = '3840x2160' # Resolution for 4K quality 34_IMG_FORMAT = 'png' 35_MIN_PHONE_MOVEMENT_ANGLE = 5 # degrees 36_NUM_ROTATIONS = 24 37_PREVIEW_DURATION = 400 # milliseconds 38_PREVIEW_MAX_TESTED_AREA = 1920 * 1440 39_PREVIEW_MIN_TESTED_AREA = 320 * 240 40_PREVIEW_STABILIZATION_FACTOR = 0.7 # 70% of gyro movement allowed 41_RED_BLUE_TOL = 20 # 20 out of 255 Red or Blue value in RGB 42_SKIP_INITIAL_FRAMES = 15 43_START_FRAME = 30 # give 3A some frames to warm up 44_VIDEO_DELAY_TIME = 5.5 # seconds 45_VIDEO_DURATION = 5.5 # seconds 46 47 48def get_720p_or_above_size(supported_preview_sizes): 49 """Returns the smallest size above or equal to 720p in preview and video. 50 51 If the largest preview size is under 720P, returns the largest value. 52 53 Args: 54 supported_preview_sizes: list; preview sizes. 55 e.g. ['1920x960', '1600x1200', '1920x1080'] 56 Returns: 57 smallest size >= 720p video format 58 """ 59 60 size_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1]) 61 smallest_area = float('inf') 62 smallest_720p_or_above_size = '' 63 largest_supported_preview_size = '' 64 largest_area = 0 65 for size in supported_preview_sizes: 66 area = size_to_area(size) 67 if smallest_area > area >= _AREA_720P_VIDEO: 68 smallest_area = area 69 smallest_720p_or_above_size = size 70 else: 71 if area > largest_area: 72 largest_area = area 73 largest_supported_preview_size = size 74 75 if largest_area > _AREA_720P_VIDEO: 76 logging.debug('Smallest 720p or above size: %s', 77 smallest_720p_or_above_size) 78 return smallest_720p_or_above_size 79 else: 80 logging.debug('Largest supported preview size: %s', 81 largest_supported_preview_size) 82 return largest_supported_preview_size 83 84 85def collect_data(cam, tablet_device, preview_size, stabilize, rot_rig, 86 zoom_ratio=None, fps_range=None, hlg10=False, ois=False): 87 """Capture a new set of data from the device. 88 89 Captures camera preview frames while the user is moving the device in 90 the prescribed manner. 91 92 Args: 93 cam: camera object. 94 tablet_device: boolean; based on config file. 95 preview_size: str; preview stream resolution. ex. '1920x1080' 96 stabilize: boolean; whether preview stabilization is ON. 97 rot_rig: dict with 'cntl' and 'ch' defined. 98 zoom_ratio: float; static zoom ratio. None if default zoom. 99 fps_range: list; target fps range. 100 hlg10: boolean; whether to capture hlg10 output. 101 ois: boolean; whether optical image stabilization is ON. 102 Returns: 103 recording object; a dictionary containing output path, video size, etc. 104 """ 105 106 output_surfaces = cam.preview_surface(preview_size, hlg10) 107 video_stream_index = 0 108 stabilize_mode = camera_properties_utils.STABILIZATION_MODE_OFF 109 if stabilize: 110 stabilize_mode = camera_properties_utils.STABILIZATION_MODE_PREVIEW 111 return collect_data_with_surfaces(cam, tablet_device, output_surfaces, 112 video_stream_index, stabilize_mode, rot_rig, 113 zoom_ratio, fps_range, ois) 114 115 116def collect_data_with_surfaces(cam, tablet_device, output_surfaces, 117 video_stream_index, stabilize_mode, rot_rig, 118 zoom_ratio=None, fps_range=None, ois=False): 119 """Capture a new set of data from the device. 120 121 Captures camera preview frames while the user is moving the device in 122 the prescribed manner. 123 124 Args: 125 cam: camera object. 126 tablet_device: boolean; based on config file. 127 output_surfaces: list of dict; The list of output surfaces configured for 128 the recording. Only the first surface is used for recording; the rest are 129 configured, but not requested. 130 video_stream_index: The index of output surface used for recording 131 stabilize_mode: int; Video stabilization mode. 132 rot_rig: dict with 'cntl' and 'ch' defined. 133 zoom_ratio: float; static zoom ratio. None if default zoom. 134 fps_range: list; target fps range. 135 ois: boolean; whether optical image stabilization is ON. 136 Returns: 137 recording object; a dictionary containing output path, video size, etc. 138 """ 139 140 logging.debug('Starting sensor event collection') 141 serial_port = None 142 if rot_rig['cntl'].lower() == sensor_fusion_utils.ARDUINO_STRING.lower(): 143 # identify port 144 serial_port = sensor_fusion_utils.serial_port_def( 145 sensor_fusion_utils.ARDUINO_STRING) 146 # send test cmd to Arduino until cmd returns properly 147 sensor_fusion_utils.establish_serial_comm(serial_port) 148 # Start camera vibration 149 if tablet_device: 150 servo_speed = sensor_fusion_utils.ARDUINO_SERVO_SPEED_STABILIZATION_TABLET 151 else: 152 servo_speed = sensor_fusion_utils.ARDUINO_SERVO_SPEED_STABILIZATION 153 p = threading.Thread( 154 target=sensor_fusion_utils.rotation_rig, 155 args=( 156 rot_rig['cntl'], 157 rot_rig['ch'], 158 _NUM_ROTATIONS, 159 sensor_fusion_utils.ARDUINO_ANGLES_STABILIZATION, 160 servo_speed, 161 sensor_fusion_utils.ARDUINO_MOVE_TIME_STABILIZATION, 162 serial_port, 163 ), 164 ) 165 p.start() 166 167 cam.start_sensor_events() 168 # Allow time for rig to start moving 169 time.sleep(_VIDEO_DELAY_TIME) 170 171 # Record video and return recording object 172 min_fps = fps_range[0] if (fps_range is not None) else None 173 max_fps = fps_range[1] if (fps_range is not None) else None 174 recording_obj = cam.do_preview_recording_multiple_surfaces( 175 output_surfaces, video_stream_index, _VIDEO_DURATION, stabilize_mode, ois, 176 zoom_ratio=zoom_ratio, ae_target_fps_min=min_fps, 177 ae_target_fps_max=max_fps 178 ) 179 180 logging.debug('Recorded output path: %s', recording_obj['recordedOutputPath']) 181 logging.debug('Tested quality: %s', recording_obj['quality']) 182 183 # Wait for vibration to stop 184 p.join() 185 186 return recording_obj 187 188 189def verify_preview_stabilization(recording_obj, gyro_events, test_name, 190 log_path, facing, zoom_ratio=None, 191 stabilization_mode=True): 192 """Verify the returned recording is properly stabilized. 193 194 Args: 195 recording_obj: Camcorder recording object. 196 gyro_events: Gyroscope events collected while recording. 197 test_name: Name of the test. 198 log_path: Path for the log file. 199 facing: Facing of the camera device. 200 zoom_ratio: Static zoom ratio. None if default zoom. 201 stabilization_mode: boolean; Whether stabilization mode is ON. 202 203 Returns: 204 A dictionary containing the maximum gyro angle, the maximum camera angle, 205 and a failure message if the recorded video isn't properly stablilized. 206 """ 207 208 file_name = recording_obj['recordedOutputPath'].split('/')[-1] 209 logging.debug('recorded file name: %s', file_name) 210 video_size = recording_obj['videoSize'] 211 logging.debug('video size: %s', video_size) 212 213 # Get all frames from the video 214 file_list = video_processing_utils.extract_all_frames_from_video( 215 log_path, file_name, _IMG_FORMAT 216 ) 217 218 logging.debug('Number of frames %d', len(file_list)) 219 # Extract camera rotations 220 if zoom_ratio: 221 zoom_ratio_suffix = f'{zoom_ratio:.1f}' 222 else: 223 zoom_ratio_suffix = '1' 224 file_name_stem = ( 225 f'{os.path.join(log_path, test_name)}_{video_size}_{zoom_ratio_suffix}x' 226 f'_stabilization={stabilization_mode}') 227 cam_rots = sensor_fusion_utils.get_cam_rotations_from_files( 228 file_list[_START_FRAME:], 229 facing, 230 file_name_stem, 231 log_path, 232 _START_FRAME, 233 stabilized_video=stabilization_mode 234 ) 235 sensor_fusion_utils.plot_camera_rotations(cam_rots, _START_FRAME, 236 video_size, file_name_stem) 237 max_camera_angle = sensor_fusion_utils.calc_max_rotation_angle( 238 cam_rots, 'Camera') 239 240 # Extract gyro rotations 241 sensor_fusion_utils.plot_gyro_events( 242 gyro_events, 243 f'{test_name}_{video_size}_{zoom_ratio_suffix}x' 244 f'_stabilization={stabilization_mode}', 245 log_path 246 ) 247 gyro_rots = sensor_fusion_utils.conv_acceleration_to_movement( 248 gyro_events, _VIDEO_DELAY_TIME) 249 max_gyro_angle = sensor_fusion_utils.calc_max_rotation_angle( 250 gyro_rots, 'Gyro') 251 logging.debug('Stabilization mode: %s', stabilization_mode) 252 logging.debug( 253 'Max deflection (degrees) %s: video: %.3f, gyro: %.3f ratio: %.4f', 254 video_size, max_camera_angle, max_gyro_angle, 255 max_camera_angle / max_gyro_angle) 256 257 # Assert phone is moved enough during test 258 if max_gyro_angle < _MIN_PHONE_MOVEMENT_ANGLE: 259 raise AssertionError( 260 f'Phone not moved enough! Movement: {max_gyro_angle}, ' 261 f'THRESH: {_MIN_PHONE_MOVEMENT_ANGLE} degrees') 262 263 w_x_h = video_size.split('x') 264 if int(w_x_h[0])/int(w_x_h[1]) > _ASPECT_RATIO_16_9: 265 preview_stabilization_factor = _PREVIEW_STABILIZATION_FACTOR * 1.1 266 else: 267 preview_stabilization_factor = _PREVIEW_STABILIZATION_FACTOR 268 269 failure_msg = None 270 if max_camera_angle >= max_gyro_angle * preview_stabilization_factor: 271 # Fail if stabilization mode is on 272 if stabilization_mode: 273 failure_msg = ( 274 f'{video_size} preview not stabilized enough! ' 275 f'Max preview angle: {max_camera_angle:.3f}, ' 276 f'Max gyro angle: {max_gyro_angle:.3f}, ' 277 f'ratio: {max_camera_angle/max_gyro_angle:.3f} ' 278 f'THRESH: {preview_stabilization_factor}.') 279 else: 280 # Fail if stabilization mode is off 281 if not stabilization_mode: 282 failure_msg = ( 283 f'{video_size} preview is stabilized when testing stabilization=OFF! ' 284 f'Max preview angle: {max_camera_angle:.3f}, ' 285 f'Max gyro angle: {max_gyro_angle:.3f}, ' 286 f'ratio: {max_camera_angle/max_gyro_angle:.3f} ' 287 f'THRESH: {preview_stabilization_factor}.') 288 289 # Delete saved frames if the format is a PASS 290 if not failure_msg: 291 for file in file_list: 292 try: 293 os.remove(os.path.join(log_path, file)) 294 except FileNotFoundError: 295 logging.debug('File Not Found: %s', str(file)) 296 logging.debug('Format %s passes, frame images removed', video_size) 297 298 return {'gyro': max_gyro_angle, 'cam': max_camera_angle, 299 'failure': failure_msg} 300 301 302def collect_preview_data_with_zoom(cam, preview_size, zoom_start, 303 zoom_end, step_size, recording_duration_ms, 304 padded_frames=False): 305 """Captures a preview video from the device. 306 307 Captures camera preview frames from the passed device. 308 309 Args: 310 cam: camera object. 311 preview_size: str; preview resolution. ex. '1920x1080'. 312 zoom_start: (float) is the starting zoom ratio during recording. 313 zoom_end: (float) is the ending zoom ratio during recording. 314 step_size: (float) is the step for zoom ratio during recording. 315 recording_duration_ms: preview recording duration in ms. 316 padded_frames: boolean; Whether to add additional frames at the beginning 317 and end of recording to workaround issue with MediaRecorder. 318 319 Returns: 320 recording object as described by cam.do_preview_recording_with_dynamic_zoom. 321 """ 322 recording_obj = cam.do_preview_recording_with_dynamic_zoom( 323 preview_size, 324 stabilize=False, 325 sweep_zoom=(zoom_start, zoom_end, step_size, recording_duration_ms), 326 padded_frames=padded_frames 327 ) 328 logging.debug('Recorded output path: %s', recording_obj['recordedOutputPath']) 329 logging.debug('Tested quality: %s', recording_obj['quality']) 330 return recording_obj 331 332 333def is_aspect_ratio_match(size_str, target_ratio): 334 """Checks if a resolution string matches the target aspect ratio.""" 335 width, height = map(int, size_str.split('x')) 336 return abs(width / height - target_ratio) < _ASPECT_TOL 337 338 339def get_max_preview_test_size(cam, camera_id, aspect_ratio=None, 340 max_tested_area=_PREVIEW_MAX_TESTED_AREA): 341 """Finds the max preview size to be tested. 342 343 If the device supports the _HIGH_RES_SIZE preview size then 344 it uses that for testing, otherwise uses the max supported 345 preview size capped at max_tested_area. 346 347 Args: 348 cam: camera object 349 camera_id: str; camera device id under test 350 aspect_ratio: preferred aspect_ratio For example: '4/3' 351 max_tested_area: area of max preview resolution 352 353 Returns: 354 preview_test_size: str; wxh resolution of the size to be tested 355 """ 356 resolution_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1]) 357 supported_preview_sizes = cam.get_all_supported_preview_sizes( 358 camera_id, filter_recordable=True) 359 logging.debug('Resolutions supported by preview and MediaRecorder: %s', 360 supported_preview_sizes) 361 362 if aspect_ratio is None: 363 supported_preview_sizes = [size for size in supported_preview_sizes 364 if resolution_to_area(size) 365 >= video_processing_utils.LOWEST_RES_TESTED_AREA] 366 else: 367 supported_preview_sizes = [size for size in supported_preview_sizes 368 if resolution_to_area(size) 369 >= video_processing_utils.LOWEST_RES_TESTED_AREA 370 and is_aspect_ratio_match(size, aspect_ratio)] 371 372 logging.debug('Supported preview resolutions: %s', supported_preview_sizes) 373 374 if _HIGH_RES_SIZE in supported_preview_sizes: 375 preview_test_size = _HIGH_RES_SIZE 376 else: 377 capped_supported_preview_sizes = [ 378 size 379 for size in supported_preview_sizes 380 if ( 381 resolution_to_area(size) <= max_tested_area 382 and resolution_to_area(size) >= _PREVIEW_MIN_TESTED_AREA 383 ) 384 ] 385 logging.debug('Capped preview resolutions: %s', 386 capped_supported_preview_sizes) 387 preview_test_size = capped_supported_preview_sizes[-1] 388 389 logging.debug('Selected preview resolution: %s', preview_test_size) 390 391 return preview_test_size 392 393 394def get_max_extension_preview_test_size(cam, camera_id, extension): 395 """Finds the max preview size for an extension to be tested. 396 397 If the device supports the _HIGH_RES_SIZE preview size then 398 it uses that for testing, otherwise uses the max supported 399 preview size capped at _PREVIEW_MAX_TESTED_AREA. 400 401 Args: 402 cam: camera object 403 camera_id: str; camera device id under test 404 extension: int; camera extension mode under test 405 406 Returns: 407 preview_test_size: str; wxh resolution of the size to be tested 408 """ 409 resolution_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1]) 410 supported_preview_sizes = ( 411 cam.get_supported_extension_preview_sizes(camera_id, extension)) 412 supported_preview_sizes = [size for size in supported_preview_sizes 413 if resolution_to_area(size) 414 >= video_processing_utils.LOWEST_RES_TESTED_AREA] 415 logging.debug('Supported preview resolutions for extension %d: %s', 416 extension, supported_preview_sizes) 417 418 if _HIGH_RES_SIZE in supported_preview_sizes: 419 preview_test_size = _HIGH_RES_SIZE 420 else: 421 capped_supported_preview_sizes = [ 422 size 423 for size in supported_preview_sizes 424 if ( 425 resolution_to_area(size) <= _PREVIEW_MAX_TESTED_AREA 426 and resolution_to_area(size) >= _PREVIEW_MIN_TESTED_AREA 427 ) 428 ] 429 preview_test_size = capped_supported_preview_sizes[-1] 430 431 logging.debug('Selected preview resolution: %s', preview_test_size) 432 433 return preview_test_size 434 435 436def is_image_green(image_path): 437 """Checks if an image is mostly green. 438 439 Checks if an image is mostly green by ensuring green is dominant 440 and red/blue values are low. 441 442 Args: 443 image_path: str; The path to the image file. 444 445 Returns: 446 bool: True if mostly green, False otherwise. 447 """ 448 449 image = cv2.imread(image_path) 450 451 green_pixels = ((image[:, :, 1] > _GREEN_TOL) & 452 (image[:, :, 0] < _RED_BLUE_TOL) & 453 (image[:, :, 2] < _RED_BLUE_TOL)).sum() 454 455 green_percentage = (green_pixels / (image.shape[0] * image.shape[1])) * 100 456 457 if green_percentage >= _GREEN_PERCENT: 458 return True 459 else: 460 return False 461 462 463def preview_over_zoom_range(dut, cam, preview_size, z_min, z_max, z_step_size, 464 log_path): 465 """Captures a preview video from the device over zoom range. 466 467 Captures camera preview frames at various zoom level in zoom range. 468 469 Args: 470 dut: device under test 471 cam: camera object 472 preview_size: str; preview resolution. ex. '1920x1080' 473 z_min: minimum zoom for preview capture 474 z_max: maximum zoom for preview capture 475 z_step_size: zoom step size from min to max 476 log_path: str; path for video file directory 477 478 Returns: 479 capture_results: total capture results of each frame 480 file_list: file name for each frame 481 """ 482 logging.debug('z_min : %.2f, z_max = %.2f, z_step_size = %.2f', 483 z_min, z_max, z_step_size) 484 485 # Converge 3A 486 cam.do_3a() 487 488 # recording preview 489 # TODO: b/350821827 - encode time stamps in camera frames instead of 490 # padded green frams 491 # MediaRecorder on some devices drop last few frames. To solve this issue 492 # add green frames as padding at the end of recorded camera frames. This way 493 # green buffer frames would be droped by MediaRecorder instead of actual 494 # frames. Later these green padded frames are removed. 495 preview_rec_obj = collect_preview_data_with_zoom( 496 cam, preview_size, z_min, z_max, z_step_size, 497 _PREVIEW_DURATION, padded_frames=True) 498 499 preview_file_name = its_session_utils.pull_file_from_dut( 500 dut, preview_rec_obj['recordedOutputPath'], log_path) 501 502 logging.debug('recorded video size : %s', 503 str(preview_rec_obj['videoSize'])) 504 505 # Extract frames as png from mp4 preview recording 506 file_list = video_processing_utils.extract_all_frames_from_video( 507 log_path, preview_file_name, _IMG_FORMAT 508 ) 509 510 first_camera_frame_idx = 0 511 last_camera_frame_idx = len(file_list) 512 513 # Find index of the first-non green frame 514 for (idx, file_name) in enumerate(file_list): 515 file_path = os.path.join(log_path, file_name) 516 if is_image_green(file_path): 517 its_session_utils.remove_file(file_path) 518 logging.debug('Removed green file %s', file_name) 519 else: 520 logging.debug('First camera frame: %s', file_name) 521 first_camera_frame_idx = idx 522 break 523 524 # Find index of last non-green frame 525 for (idx, file_name) in reversed(list(enumerate(file_list))): 526 file_path = os.path.join(log_path, file_name) 527 if is_image_green(file_path): 528 its_session_utils.remove_file(file_path) 529 logging.debug('Removed green file %s', file_name) 530 else: 531 logging.debug('Last camera frame: %s', file_name) 532 last_camera_frame_idx = idx 533 break 534 535 logging.debug('start idx = %d -- end idx = %d', first_camera_frame_idx, 536 last_camera_frame_idx) 537 file_list = file_list[first_camera_frame_idx:last_camera_frame_idx+1] 538 539 # Raise error if capture result and frame count doesn't match 540 capture_results = preview_rec_obj['captureMetadata'] 541 extra_capture_result_count = len(capture_results) - len(file_list) 542 logging.debug('Number of frames %d', len(file_list)) 543 if extra_capture_result_count != 0: 544 its_session_utils.remove_frame_files(log_path) 545 e_msg = (f'Number of CaptureResult ({len(capture_results)}) ' 546 f'vs number of Frames ({len(file_list)}) count mismatch.' 547 ' Retry Test.') 548 raise AssertionError(e_msg) 549 550 # skip frames which might not have 3A converged 551 capture_results = capture_results[_SKIP_INITIAL_FRAMES:] 552 skipped_files = file_list[:_SKIP_INITIAL_FRAMES] 553 file_list = file_list[_SKIP_INITIAL_FRAMES:] 554 555 # delete skipped files 556 for file_name in skipped_files: 557 its_session_utils.remove_file(os.path.join(log_path, file_name)) 558 559 return capture_results, file_list 560