• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2014 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import glob
16import json
17import logging
18import os
19import os.path
20import re
21import subprocess
22import sys
23import tempfile
24import time
25import types
26
27import camera_properties_utils
28import capture_request_utils
29import image_processing_utils
30import its_device_utils
31import its_session_utils
32import lighting_control_utils
33import numpy as np
34import yaml
35
36
37YAML_FILE_DIR = os.environ['CAMERA_ITS_TOP']
38CONFIG_FILE = os.path.join(YAML_FILE_DIR, 'config.yml')
39TEST_KEY_TABLET = 'tablet'
40TEST_KEY_SENSOR_FUSION = 'sensor_fusion'
41TEST_KEY_GEN2 = 'gen2'
42ACTIVITY_START_WAIT = 1.5  # seconds
43MERGE_RESULTS_TIMEOUT = 3600  # seconds
44
45NUM_TRIES = 2
46RESULT_PASS = 'PASS'
47RESULT_FAIL = 'FAIL'
48RESULT_NOT_EXECUTED = 'NOT_EXECUTED'
49RESULT_KEY = 'result'
50METRICS_KEY = 'mpc_metrics'
51PERFORMANCE_KEY = 'performance_metrics'
52FEATURE_QUERY_KEY = 'feature_query_proto'
53FEATURE_QUERY_PATH_KEY = 'feature_query_proto_path'
54SUMMARY_KEY = 'summary'
55RESULT_VALUES = (RESULT_PASS, RESULT_FAIL, RESULT_NOT_EXECUTED)
56CTS_VERIFIER_PACKAGE_NAME = 'com.android.cts.verifier'
57ACTION_ITS_RESULT = 'com.android.cts.verifier.camera.its.ACTION_ITS_RESULT'
58EXTRA_VERSION = 'camera.its.extra.VERSION'
59CURRENT_ITS_VERSION = '1.0'  # version number to sync with CtsVerifier
60EXTRA_CAMERA_ID = 'camera.its.extra.CAMERA_ID'
61EXTRA_RESULTS = 'camera.its.extra.RESULTS'
62EXTRA_TABLET_NAME = 'camera.its.extra.TABLET_NAME'
63TIME_KEY_START = 'start'
64TIME_KEY_END = 'end'
65VALID_CONTROLLERS = ('arduino', 'canakit')
66_FRONT_CAMERA_ID = '1'
67# recover replaced '_' in scene def
68_INT_STR_DICT = types.MappingProxyType({'11': '1_1', '12': '1_2', '13': '1_3'})
69_MAIN_TESTBED = 0
70_PROPERTIES_TO_MATCH = (
71    'ro.product.model', 'ro.product.name', 'ro.build.display.id', 'ro.revision'
72)
73
74# Scenes that can be automated through tablet display
75# Notes on scene names:
76#   scene*_1/2/... are same scene split to load balance run times for scenes
77#   scene*_a/b/... are similar scenes that share one or more tests
78_TABLET_SCENES = (
79    'scene0', 'scene1_1', 'scene1_2', 'scene1_3', 'scene2_a', 'scene2_b',
80    'scene2_c', 'scene2_d', 'scene2_e', 'scene2_f', 'scene2_g', 'scene3',
81    'scene4', 'scene6', 'scene7', 'scene8', 'scene9',
82    os.path.join('scene_extensions', 'scene_hdr'),
83    os.path.join('scene_extensions', 'scene_low_light'),
84    os.path.join('scene_tele', 'scene6_tele'),
85    os.path.join('scene_tele', 'scene7_tele'),
86    'scene_video',
87)
88
89# Scenes that use the 'sensor_fusion' test rig
90_MOTION_SCENES = ('sensor_fusion', 'feature_combination',)
91
92# Scenes that uses lighting control
93_FLASH_SCENES = ('scene_flash',)
94
95# Scenes that uses checkerboard as chart
96_CHECKERBOARD_SCENES = ('sensor_fusion', 'scene_flash', 'feature_combination',)
97
98# Scenes that have to be run manually regardless of configuration
99_MANUAL_SCENES = ('scene5',)
100
101# Scene extensions
102_EXTENSIONS_SCENES = (os.path.join('scene_extensions', 'scene_hdr'),
103                      os.path.join('scene_extensions', 'scene_low_light'),
104                      )
105# Scene tele
106_TELE_SCENES = (os.path.join('scene_tele', 'scene6_tele'),
107                os.path.join('scene_tele', 'scene7_tele'),
108                )
109_GEN2_RIG_SCENES = ('scene_ip',)
110# All possible scenes
111_ALL_SCENES = (_TABLET_SCENES + _MANUAL_SCENES + _MOTION_SCENES +
112               _FLASH_SCENES + _GEN2_RIG_SCENES)
113
114# Scenes that are logically grouped and can be called as group
115# scene6_tele is not grouped with scene6 because it requires extension rig
116_GROUPED_SCENES = types.MappingProxyType({
117        'scene1': ('scene1_1', 'scene1_2', 'scene1_3'),
118        'scene2': ('scene2_a', 'scene2_b', 'scene2_c', 'scene2_d', 'scene2_e',
119                   'scene2_f', 'scene2_g')
120})
121
122# Scene requirements for manual testing.
123_SCENE_REQ = types.MappingProxyType({
124    'scene0': None,
125    'scene1_1': 'A grey card covering at least the middle 30% of the scene',
126    'scene1_2': 'A grey card covering at least the middle 30% of the scene',
127    'scene1_3': 'A grey card covering at least the middle 30% of the scene, '
128                'without a white border like scene1_1 or scene1_2',
129    'scene2_a': 'The picture with 3 faces in tests/scene2_a/scene2_a.png',
130    'scene2_b': 'The picture with 3 faces in tests/scene2_b/scene2_b.png',
131    'scene2_c': 'The picture with 3 faces in tests/scene2_c/scene2_c.png',
132    'scene2_d': 'The picture with 3 faces in tests/scene2_d/scene2_d.png',
133    'scene2_e': 'The picture with 3 faces in tests/scene2_e/scene2_e.png',
134    'scene2_f': 'The picture with 3 faces in tests/scene2_f/scene2_f.png',
135    'scene2_g': 'The picture with 3 profile faces in '
136                'tests/scene2_g/scene2_g.png',
137    'scene3': 'The ISO12233 chart',
138    'scene4': 'A test chart of a circle covering at least the middle 50% of '
139              'the scene. See tests/scene4/scene4.png',
140    'scene5': 'Capture images with a diffuser attached to the camera. See '
141              'source.android.com/docs/compatibility/cts/camera-its-tests#scene5/diffuser '  # pylint: disable line-too-long
142              'for more details',
143    'scene6': 'A grid of ArUco markers on a white background. '
144              'See tests/scene6/scene6.png',
145    'scene7': 'The picture with 4 different colors, slanted edge and'
146              '4 ArUco markers. See tests/scene7/scene7.png',
147    'scene8': 'The picture with 4 faces in 4 different colors overlay.'
148              'See tests/scene8/scene8.png',
149    'scene9': 'A scene with high entropy consisting of random size and colored '
150              'circles. See tests/scene9/scene9.png',
151    # Use os.path to avoid confusion on other platforms
152    os.path.join('scene_extensions', 'scene_hdr'): (
153        'A tablet displayed scene with a face on the left '
154        'and a low-contrast QR code on the right. '
155        'See tests/scene_extensions/scene_hdr/scene_hdr.png'
156    ),
157    os.path.join('scene_extensions', 'scene_low_light'): (
158        'A tablet displayed scene with a grid of squares of varying '
159        'brightness. See '
160        'tests/scene_extensions/scene_low_light/scene_low_light.png'
161    ),
162    os.path.join('scene_tele', 'scene6_tele'): (
163        'Identical to scene6, but for tele cameras. '
164        'See tests/scene_tele/scene6_tele/scene6_tele.png'
165    ),
166    os.path.join('scene_tele', 'scene7_tele'): (
167        'Identical to scene7, but for tele cameras. '
168        'See tests/scene_tele/scene7_tele/scene7_tele.png'
169    ),
170    'sensor_fusion': 'A checkerboard pattern for phone to rotate in front of '
171                     'in tests/sensor_fusion/checkerboard.pdf\n'
172                     'See tests/sensor_fusion/SensorFusion.pdf for detailed '
173                     'instructions.\nNote that this test will be skipped '
174                     'on devices not supporting REALTIME camera timestamp.',
175    'feature_combination': 'The same scene as sensor_fusion, '
176                           'separated for easier testing.',
177    'scene_flash': 'A checkerboard pattern chart with lights off.',
178    'scene_video': 'A tablet displayed scene with a series of circles moving '
179                   'at different simulated frame rates. '
180                   'See tests/scene_video/scene_video.mp4',
181    'scene_ip': 'A chart with features such as QR code, color checker chart, '
182                'dead leaf patch, dynamic range chart used to analyze metrics '
183                'such as brightness, sharpness, color accuracy.',
184})
185
186# Made mutable to allow for test augmentation based on first API level
187SUB_CAMERA_TESTS = {
188    'scene0': (
189        'test_jitter',
190        'test_metadata',
191        'test_request_capture_match',
192        'test_sensor_events',
193        'test_solid_color_test_pattern',
194        'test_unified_timestamps',
195    ),
196    'scene1_1': (
197        'test_burst_capture',
198        'test_burst_sameness_manual',
199        'test_exposure_x_iso',
200        'test_linearity',
201    ),
202    'scene1_2': (
203        'test_raw_exposure',
204    ),
205    'scene1_3': (
206        'test_dng_noise_model',
207        'test_raw_sensitivity',
208        'test_yuv_plus_raw',
209    ),
210    'scene2_a': (
211        'test_num_faces',
212    ),
213    'scene4': (
214        'test_aspect_ratio_and_crop',
215    ),
216    'scene_tele/scene6_tele': (
217        'test_zoom_tele',
218        'test_preview_zoom_tele',
219    ),
220    'scene_tele/scene7_tele': (
221        'test_multi_camera_switch_tele',
222    ),
223    'scene_video': (
224        'test_preview_frame_drop',
225    ),
226    'sensor_fusion': (
227        'test_sensor_fusion',
228    ),
229}
230
231_LIGHTING_CONTROL_TESTS = (
232    'test_auto_flash.py',
233    'test_preview_min_frame_rate.py',
234    'test_led_snapshot.py',
235    'test_night_extension.py',
236    'test_low_light_boost_extension.py',
237    'test_hdr_extension.py',
238    )
239
240_EXTENSION_NAMES = (
241    'hdr',
242    'low_light',
243)
244
245_DST_SCENE_DIR = '/sdcard/Download/'
246_DST_ROOT_DIR = '/sdcard/'
247_SUB_CAMERA_LEVELS = 2
248MOBLY_TEST_SUMMARY_TXT_FILE = 'test_mobly_summary.txt'
249
250
251def report_result(device_id, camera_id, tablet_name, results):
252  """Sends a pass/fail result to the device, via an intent.
253
254  Args:
255   device_id: The ID string of the device to report the results to.
256   camera_id: The ID string of the camera for which to report pass/fail.
257   tablet_name: The tablet name to identify model and build.
258   results: a dictionary contains all ITS scenes as key and result/summary of
259            current ITS run. See test_report_result unit test for an example.
260  """
261  adb = f'adb -s {device_id}'
262  its_device_utils.start_its_test_activity(device_id)
263  time.sleep(ACTIVITY_START_WAIT)
264
265  # Validate/process results argument
266  for scene in results:
267    if RESULT_KEY not in results[scene]:
268      raise ValueError(f'ITS result not found for {scene}')
269    if results[scene][RESULT_KEY] not in RESULT_VALUES:
270      raise ValueError(f'Unknown ITS result for {scene}: {results[RESULT_KEY]}')
271    if SUMMARY_KEY in results[scene]:
272      device_summary_path = f'/sdcard/its_camera{camera_id}_{scene}.txt'
273      its_device_utils.run(
274          f'{adb} push {results[scene][SUMMARY_KEY]} {device_summary_path}')
275      results[scene][SUMMARY_KEY] = device_summary_path
276    if (FEATURE_QUERY_KEY in results[scene] and
277        FEATURE_QUERY_PATH_KEY in results[scene]):
278      for (proto_path, proto_file) in zip(
279          results[scene][FEATURE_QUERY_PATH_KEY],
280          results[scene][FEATURE_QUERY_KEY]):
281        host_path = os.path.join(proto_path, proto_file)
282        its_device_utils.run(
283            f'{adb} push {host_path} {_DST_ROOT_DIR}')
284
285  json_results = json.dumps(results)
286  cmd = (f"{adb} shell am broadcast -a {ACTION_ITS_RESULT} --es {EXTRA_VERSION}"
287         f" {CURRENT_ITS_VERSION} --es {EXTRA_CAMERA_ID} {camera_id} --es "
288         f"{EXTRA_TABLET_NAME} {tablet_name} --es "
289         f"{EXTRA_RESULTS} \'{json_results}\'")
290  its_device_utils.run(cmd)
291
292
293def write_result(testbed_index, device_id, camera_id, results):
294  """Writes results to a temporary file for merging.
295
296  Args:
297    testbed_index: the index of a finished testbed.
298    device_id: the ID string of the device that created results.
299    camera_id: the ID string of the camera of the device.
300    results: a dictionary that contains all ITS scenes as key
301             and result/summary of current ITS run.
302  """
303  result = {'device_id': device_id, 'results': results}
304  file_name = f'testbed_{testbed_index}_camera_{camera_id}.tmp'
305  with open(file_name, 'w') as f:
306    json.dump(result, f)
307
308
309def parse_testbeds(completed_testbeds):
310  """Parses completed testbeds and yields device_id, camera_id, and results.
311
312  Args:
313    completed_testbeds: an iterable of completed testbed indices.
314  Yields:
315    device_id: the device associated with the testbed.
316    camera_id: one of the camera_ids associated with the testbed.
317    results: the dictionary with scenes and result/summary of testbed's run.
318  """
319  for i in completed_testbeds:
320    for file_name in glob.glob(f'testbed_{i}_camera_*.tmp'):
321      camera_id = file_name.split('camera_')[1].split('.tmp')[0]
322      device_id = ''
323      results = {}
324      with open(file_name, 'r') as f:
325        testbed_data = json.load(f)
326        device_id = testbed_data['device_id']
327        results = testbed_data['results']
328      if not device_id or not results:
329        raise ValueError(f'device_id or results for {file_name} not found.')
330      yield device_id, camera_id, results
331
332
333def get_device_property(device_id, property_name):
334  """Get property of a given device.
335
336  Args:
337    device_id: the ID string of a device.
338    property_name: the desired property string.
339  Returns:
340    The value of the property.
341  """
342  property_cmd = f'adb -s {device_id} shell getprop {property_name}'
343  raw_output = subprocess.check_output(
344      property_cmd, stderr=subprocess.STDOUT, shell=True)
345  return str(raw_output.decode('utf-8')).strip()
346
347
348def are_devices_similar(device_id_1, device_id_2):
349  """Checks if key dimensions are the same between devices.
350
351  Args:
352    device_id_1: the ID string of the _MAIN_TESTBED device.
353    device_id_2: the ID string of another device.
354  Returns:
355    True if both devices share key dimensions.
356  """
357  for property_to_match in _PROPERTIES_TO_MATCH:
358    property_value_1 = get_device_property(device_id_1, property_to_match)
359    property_value_2 = get_device_property(device_id_2, property_to_match)
360    if property_value_1 != property_value_2:
361      logging.error('%s does not match %s for %s',
362                    property_value_1, property_value_2, property_to_match)
363      return False
364  return True
365
366
367def check_manual_scenes(device_id, camera_id, scene, out_path):
368  """Halt run to change scenes.
369
370  Args:
371    device_id: id of device
372    camera_id: id of camera
373    scene: Name of the scene to copy image files.
374    out_path: output file location
375  """
376  hidden_physical_id = None
377  if its_session_utils.SUB_CAMERA_SEPARATOR in camera_id:
378    split_camera_ids = camera_id.split(its_session_utils.SUB_CAMERA_SEPARATOR)
379    if len(split_camera_ids) == _SUB_CAMERA_LEVELS:
380      camera_id = split_camera_ids[0]
381      hidden_physical_id = split_camera_ids[1]
382
383  with its_session_utils.ItsSession(
384      device_id=device_id,
385      camera_id=camera_id,
386      hidden_physical_id=hidden_physical_id) as cam:
387    props = cam.get_camera_properties()
388    props = cam.override_with_hidden_physical_camera_props(props)
389
390    while True:
391      input(f'\n Press <ENTER> after positioning camera {camera_id} with '
392            f'{scene}.\n The scene setup should be: \n  {_SCENE_REQ[scene]}\n')
393      # Converge 3A prior to capture
394      if scene == 'scene5':
395        cam.do_3a(do_af=False, lock_ae=camera_properties_utils.ae_lock(props),
396                  lock_awb=camera_properties_utils.awb_lock(props))
397      else:
398        cam.do_3a()
399      req, fmt = capture_request_utils.get_fastest_auto_capture_settings(props)
400      logging.info('Capturing an image to check the test scene')
401      cap = cam.do_capture(req, fmt)
402      img = image_processing_utils.convert_capture_to_rgb_image(cap)
403      img_name = os.path.join(out_path, f'test_{scene.replace("/", "_")}.jpg')
404      logging.info('Please check scene setup in %s', img_name)
405      image_processing_utils.write_image(img, img_name)
406      choice = input(f'Is the image okay for ITS {scene}? (Y/N)').lower()
407      if choice == 'y':
408        break
409
410
411def get_config_file_contents():
412  """Read the config file contents from a YML file.
413
414  Args:
415    None
416
417  Returns:
418    config_file_contents: a dict read from config.yml
419  """
420  with open(CONFIG_FILE) as file:
421    config_file_contents = yaml.safe_load(file)
422  return config_file_contents
423
424
425def get_test_params(config_file_contents):
426  """Reads the config file parameters.
427
428  Args:
429    config_file_contents: dict read from config.yml file
430
431  Returns:
432    dict of test parameters
433  """
434  test_params = None
435  for _, j in config_file_contents.items():
436    for datadict in j:
437      test_params = datadict.get('TestParams')
438  return test_params
439
440
441def get_device_serial_number(device, config_file_contents):
442  """Returns the serial number of the device with label from the config file.
443
444  The config file contains TestBeds dictionary which contains Controllers and
445  Android Device dicts.The two devices used by the test per box are listed
446  here labels dut and tablet. Parse through the nested TestBeds dict to get
447  the Android device details.
448
449  Args:
450    device: String device label as specified in config file.dut/tablet
451    config_file_contents: dict read from config.yml file
452  """
453
454  for _, j in config_file_contents.items():
455    for datadict in j:
456      android_device_contents = datadict.get('Controllers')
457      for device_dict in android_device_contents.get('AndroidDevice'):
458        for _, label in device_dict.items():
459          if label == 'tablet':
460            tablet_device_id = str(device_dict.get('serial'))
461          if label == 'dut':
462            dut_device_id = str(device_dict.get('serial'))
463  if device == 'tablet':
464    return tablet_device_id
465  else:
466    return dut_device_id
467
468
469def get_updated_yml_file(yml_file_contents):
470  """Create a new yml file and write the testbed contents in it.
471
472  This testbed file is per box and contains all the parameters and
473  device id used by the mobly tests.
474
475  Args:
476   yml_file_contents: Data to write in yml file.
477
478  Returns:
479    Updated yml file contents.
480  """
481  os.chmod(YAML_FILE_DIR, 0o755)
482  file_descriptor, new_yaml_file = tempfile.mkstemp(
483      suffix='.yml', prefix='config_', dir=YAML_FILE_DIR)
484  os.close(file_descriptor)
485  with open(new_yaml_file, 'w') as f:
486    yaml.dump(yml_file_contents, stream=f, default_flow_style=False)
487  new_yaml_file_name = os.path.basename(new_yaml_file)
488  return new_yaml_file_name
489
490
491def enable_external_storage(device_id):
492  """Override apk mode to allow write to external storage.
493
494  Args:
495    device_id: Serial number of the device.
496
497  """
498  cmd = (f'adb -s {device_id} shell appops '
499         'set com.android.cts.verifier MANAGE_EXTERNAL_STORAGE allow')
500  its_device_utils.run(cmd)
501
502
503def get_available_cameras(device_id, camera_id):
504  """Get available camera devices in the current state.
505
506  Args:
507    device_id: Serial number of the device.
508    camera_id: Logical camera_id
509
510  Returns:
511    List of all the available camera_ids.
512  """
513  with its_session_utils.ItsSession(
514      device_id=device_id,
515      camera_id=camera_id) as cam:
516    props = cam.get_camera_properties()
517    props = cam.override_with_hidden_physical_camera_props(props)
518    unavailable_physical_cameras = cam.get_unavailable_physical_cameras(
519        camera_id)
520    unavailable_physical_ids = unavailable_physical_cameras[
521        'unavailablePhysicalCamerasArray']
522    output = cam.get_camera_ids()
523    all_camera_ids = output['cameraIdArray']
524    # Concat camera_id, physical camera_id and sub camera separator
525    unavailable_physical_ids = [f'{camera_id}.{s}'
526                                for s in unavailable_physical_ids]
527    for i in unavailable_physical_ids:
528      if i in all_camera_ids:
529        all_camera_ids.remove(i)
530    logging.debug('available camera ids: %s', all_camera_ids)
531  return all_camera_ids
532
533
534def get_unavailable_physical_cameras(device_id, camera_id):
535  """Get unavailable physical cameras in the current state.
536
537  Args:
538    device_id: Serial number of the device.
539    camera_id: Logical camera device id
540
541  Returns:
542    List of all the unavailable camera_ids.
543  """
544  with its_session_utils.ItsSession(
545      device_id=device_id,
546      camera_id=camera_id) as cam:
547    unavailable_physical_cameras = cam.get_unavailable_physical_cameras(
548        camera_id)
549    unavailable_physical_ids = unavailable_physical_cameras[
550        'unavailablePhysicalCamerasArray']
551    unavailable_physical_ids = [f'{camera_id}.{s}'
552                                for s in unavailable_physical_ids]
553    logging.debug('Unavailable physical camera ids: %s',
554                  unavailable_physical_ids)
555  return unavailable_physical_ids
556
557
558def is_device_folded(device_id):
559  """Returns True if the foldable device is in folded state.
560
561  Args:
562    device_id: Serial number of the foldable device.
563  """
564  cmd = (f'adb -s {device_id} shell cmd device_state state')
565  result = subprocess.getoutput(cmd)
566  if 'CLOSE' in result:
567    return True
568  return False
569
570
571def augment_sub_camera_tests(first_api_level):
572  """Adds certain tests to SUB_CAMERA_TESTS depending on first_api_level.
573
574  Args:
575    first_api_level: First api level of the device.
576  """
577  if (first_api_level >= its_session_utils.ANDROID15_API_LEVEL):
578    logging.debug('Augmenting sub camera tests')
579    SUB_CAMERA_TESTS['scene6'] = ('test_in_sensor_zoom',)
580
581
582def main():
583  """Run all the Camera ITS automated tests.
584
585    Script should be run from the top-level CameraITS directory.
586
587    Command line arguments:
588        camera:  the camera(s) to be tested. Use comma to separate multiple
589                 camera Ids. Ex: "camera=0,1" or "camera=1"
590        scenes:  the test scene(s) to be executed. Use comma to separate
591                 multiple scenes. Ex: "scenes=scene0,scene1_1" or
592                 "scenes=0,1_1,sensor_fusion" (sceneX can be abbreviated by X
593                 where X is scene name minus 'scene')
594  """
595  logging.basicConfig(level=logging.INFO)
596  # Make output directories to hold the generated files.
597  topdir = tempfile.mkdtemp(prefix='CameraITS_')
598  try:
599    subprocess.call(['chmod', 'g+rx', topdir])
600  except OSError as e:
601    logging.info(repr(e))
602
603  scenes = []
604  camera_id_combos = []
605  testbed_index = None
606  num_testbeds = None
607
608  # Override camera, scenes and testbed with cmd line values if available
609  for s in list(sys.argv[1:]):
610    if 'scenes=' in s:
611      scenes = s.split('scenes=')[1].split(',')
612    elif 'camera=' in s:
613      camera_id_combos = s.split('=')[1].split(',')
614    elif 'testbed_index=' in s:
615      testbed_index = int(s.split('=')[1])
616    elif 'num_testbeds=' in s:
617      num_testbeds = int(s.split('=')[1])
618    else:
619      raise ValueError(f'Unknown argument {s}')
620  if testbed_index is None and num_testbeds is not None:
621    raise ValueError(
622        'testbed_index must be specified if num_testbeds is specified.')
623  if (testbed_index is not None and num_testbeds is not None and
624      testbed_index >= num_testbeds):
625    raise ValueError('testbed_index must be less than num_testbeds. '
626                     'testbed_index starts at 0.')
627
628  # Prepend 'scene' if not specified at cmd line
629  for i, s in enumerate(scenes):
630    if (not s.startswith('scene') and
631        not s.startswith(('checkerboard', 'sensor_fusion',
632                          'flash', 'feature_combination', '<scene-name>'))):
633      scenes[i] = f'scene{s}'
634    if (s.startswith('flash') or s.startswith('extensions')
635        or s.startswith('ip')):
636      scenes[i] = f'scene_{s}'
637    # Handle scene_extensions
638    if any(s.startswith(extension) for extension in _EXTENSION_NAMES):
639      scenes[i] = f'scene_extensions/scene_{s}'
640    if (any(s.startswith('scene_' + extension)
641            for extension in _EXTENSION_NAMES)):
642      scenes[i] = f'scene_extensions/{s}'
643    # Handle scene_tele
644    if s == 'scene6_tele' or s == 'scene7_tele':
645      scenes[i] = f'scene_tele/{s}'
646
647  # Read config file and extract relevant TestBed
648  config_file_contents = get_config_file_contents()
649  if testbed_index is None:
650    testbed_to_remove = []
651    for i in config_file_contents['TestBeds']:
652      name = i['Name'].lower()
653      if scenes == ['scene_ip']:
654        if TEST_KEY_GEN2 not in name:
655          testbed_to_remove.append(i)
656      elif set(scenes).intersection(
657          set(_CHECKERBOARD_SCENES)) or scenes == ['checkerboard']:
658        if TEST_KEY_SENSOR_FUSION not in name:
659          testbed_to_remove.append(i)
660      else:
661        if TEST_KEY_SENSOR_FUSION in name or TEST_KEY_GEN2 in name:
662          testbed_to_remove.append(i)
663    for i in testbed_to_remove:
664      config_file_contents['TestBeds'].remove(i)
665  else:
666    config_file_contents = {
667        'TestBeds': [config_file_contents['TestBeds'][testbed_index]]
668    }
669
670  # Get test parameters from config file
671  test_params_content = get_test_params(config_file_contents)
672  if not camera_id_combos:
673    camera_id_combos = str(test_params_content['camera']).split(',')
674  if not scenes:
675    scenes = str(test_params_content['scene']).split(',')
676    scenes = [_INT_STR_DICT.get(n, n) for n in scenes]  # recover '1_1' & '1_2'
677
678  device_id = get_device_serial_number('dut', config_file_contents)
679  # Enable external storage on DUT to send summary report to CtsVerifier.apk
680  enable_external_storage(device_id)
681
682  # Add to SUB_CAMERA_TESTS depending on first_api_level
683  augment_sub_camera_tests(its_session_utils.get_first_api_level(device_id))
684
685  # Verify that CTS Verifier is installed
686  its_session_utils.check_apk_installed(device_id, CTS_VERIFIER_PACKAGE_NAME)
687  # Check whether the dut is foldable or not
688  testing_foldable_device = True if test_params_content[
689      'foldable_device'] == 'True' else False
690  available_camera_ids_to_test_foldable = []
691  if testing_foldable_device:
692    logging.debug('Testing foldable device.')
693    # Check the state of foldable device. True if device is folded,
694    # false if the device is opened.
695    device_folded = is_device_folded(device_id)
696    # list of available camera_ids to be tested in device state
697    available_camera_ids_to_test_foldable = get_available_cameras(
698        device_id, _FRONT_CAMERA_ID)
699
700  config_file_test_key = config_file_contents['TestBeds'][0]['Name'].lower()
701  logging.info('Saving %s output files to: %s', config_file_test_key, topdir)
702  if TEST_KEY_TABLET in config_file_test_key:
703    tablet_id = get_device_serial_number('tablet', config_file_contents)
704    tablet_name_cmd = f'adb -s {tablet_id} shell getprop ro.product.device'
705    raw_output = subprocess.check_output(
706        tablet_name_cmd, stderr=subprocess.STDOUT, shell=True)
707    tablet_name = str(raw_output.decode('utf-8')).strip()
708    logging.debug('Tablet name: %s', tablet_name)
709    brightness = test_params_content['brightness']
710    its_session_utils.validate_tablet(tablet_name, brightness, tablet_id)
711  elif TEST_KEY_GEN2 in config_file_test_key:
712    tablet_id = None
713    tablet_name = 'ip_chart'
714  else:
715    tablet_id = None
716    tablet_name = 'sensor_fusion'
717
718  testing_sensor_fusion_with_controller = False
719  if TEST_KEY_SENSOR_FUSION in config_file_test_key:
720    if test_params_content['rotator_cntl'].lower() in VALID_CONTROLLERS:
721      testing_sensor_fusion_with_controller = True
722
723  testing_flash_with_controller = False
724  if (test_params_content.get('lighting_cntl', 'None').lower() == 'arduino' and
725      'manual' not in config_file_test_key):
726    testing_flash_with_controller = True
727
728  # Expand GROUPED_SCENES and remove any duplicates
729  scenes = [_GROUPED_SCENES[s] if s in _GROUPED_SCENES else s for s in scenes]
730  scenes = np.hstack(scenes).tolist()
731  scenes = sorted(set(scenes), key=scenes.index)
732  # List of scenes to be executed in folded state will have '_folded'
733  # prefix. This will help distinguish the test results from folded vs
734  # open device state for front camera_ids.
735  folded_device_scenes = []
736  for scene in scenes:
737    folded_device_scenes.append(f'{scene}_folded')
738
739  logging.info('Running ITS on device: %s, camera(s): %s, scene(s): %s',
740               device_id, camera_id_combos, scenes)
741
742  # Determine if manual run
743  if tablet_id is not None and not set(scenes).intersection(_MANUAL_SCENES):
744    auto_scene_switch = True
745  else:
746    auto_scene_switch = False
747    logging.info('Manual, checkerboard scenes, scene5 or scene_ip testing.')
748
749  folded_prompted = False
750  opened_prompted = False
751  for camera_id in camera_id_combos:
752    test_params_content['camera'] = camera_id
753    results = {}
754    unav_cameras = []
755    # Get the list of unavailable cameras in current device state.
756    # These camera_ids should not be tested in current device state.
757    if testing_foldable_device:
758      unav_cameras = get_unavailable_physical_cameras(
759          device_id, _FRONT_CAMERA_ID)
760
761    if testing_foldable_device:
762      device_state = 'folded' if device_folded else 'opened'
763
764    testing_folded_front_camera = (testing_foldable_device and
765                                   device_folded and
766                                   _FRONT_CAMERA_ID in camera_id)
767
768    # Raise an assertion error if there is any camera unavailable in
769    # current device state. Usually scenes with suffix 'folded' will
770    # be executed in folded state.
771    if (testing_foldable_device and
772        _FRONT_CAMERA_ID in camera_id and camera_id in unav_cameras):
773      raise AssertionError(
774          f'Camera {camera_id} is unavailable in device state {device_state}'
775          f' and cannot be tested with device {device_state}!')
776
777    if (testing_folded_front_camera and camera_id not in unav_cameras
778        and not folded_prompted):
779      folded_prompted = True
780      input('\nYou are testing a foldable device in folded state. '
781            'Please make sure the device is folded and press <ENTER> '
782            'after positioning properly.\n')
783
784    if (testing_foldable_device and
785        not device_folded and _FRONT_CAMERA_ID in camera_id and
786        camera_id not in unav_cameras and not opened_prompted):
787      opened_prompted = True
788      input('\nYou are testing a foldable device in opened state. '
789            'Please make sure the device is unfolded and press <ENTER> '
790            'after positioning properly.\n')
791
792    # Run through all scenes if user does not supply one and config file doesn't
793    # have specific scene name listed.
794    if TEST_KEY_SENSOR_FUSION in config_file_test_key:
795      possible_scenes = _CHECKERBOARD_SCENES
796    elif its_session_utils.SUB_CAMERA_SEPARATOR in camera_id:
797      possible_scenes = list(SUB_CAMERA_TESTS.keys())
798      if auto_scene_switch:
799        possible_scenes.remove('sensor_fusion')
800      if 'scene_tele' in scenes:
801        possible_scenes = _TELE_SCENES
802    else:
803      if 'scene_extensions' in scenes:
804        possible_scenes = _EXTENSIONS_SCENES
805      elif 'scene_tele' in scenes:
806        possible_scenes = _TELE_SCENES
807      elif 'scene_ip' in scenes:
808        possible_scenes = _GEN2_RIG_SCENES
809      else:
810        possible_scenes = _TABLET_SCENES if auto_scene_switch else _ALL_SCENES
811    if ('<scene-name>' in scenes or 'checkerboard' in scenes or
812        'scene_extensions' in scenes or 'scene_tele' in scenes
813        or 'scene_ip' in scenes):
814      per_camera_scenes = possible_scenes
815    else:
816      # Validate user input scene names
817      per_camera_scenes = []
818      for s in scenes:
819        if s in possible_scenes:
820          per_camera_scenes.append(s)
821      if not per_camera_scenes:
822        raise ValueError('No valid scene specified for this camera.')
823
824    # Folded state scenes will have 'folded' suffix only for
825    # front cameras since rear cameras are common in both folded
826    # and unfolded state.
827    foldable_per_camera_scenes = []
828    if testing_folded_front_camera:
829      if camera_id not in available_camera_ids_to_test_foldable:
830        raise AssertionError(f'camera {camera_id} is not available.')
831      for s in per_camera_scenes:
832        foldable_per_camera_scenes.append(f'{s}_folded')
833
834    if foldable_per_camera_scenes:
835      per_camera_scenes = foldable_per_camera_scenes
836
837    logging.info('camera: %s, scene(s): %s', camera_id, per_camera_scenes)
838
839    if testing_folded_front_camera:
840      all_scenes = [f'{s}_folded' for s in _ALL_SCENES]
841    else:
842      all_scenes = _ALL_SCENES
843
844    for s in all_scenes:
845      results[s] = {RESULT_KEY: RESULT_NOT_EXECUTED}
846
847      # assert device folded testing scenes with suffix 'folded'
848      if testing_foldable_device and 'folded' in s:
849        if not device_folded:
850          raise AssertionError('Device should be folded during'
851                               ' testing scenes with suffix "folded"')
852
853    # A subdir in topdir will be created for each camera_id. All scene test
854    # output logs for each camera id will be stored in this subdir.
855    # This output log path is a mobly param : LogPath
856    camera_id_str = (
857        camera_id.replace(its_session_utils.SUB_CAMERA_SEPARATOR, '_')
858    )
859    mobly_output_logs_path = os.path.join(topdir, f'cam_id_{camera_id_str}')
860    os.mkdir(mobly_output_logs_path)
861    tot_pass = 0
862    for s in per_camera_scenes:
863      results[s]['TEST_STATUS'] = []
864      results[s][METRICS_KEY] = []
865      results[s][PERFORMANCE_KEY] = []
866      results[s][FEATURE_QUERY_KEY] = []
867      results[s][FEATURE_QUERY_PATH_KEY] = []
868
869      # unit is millisecond for execution time record in CtsVerifier
870      scene_start_time = int(round(time.time() * 1000))
871      scene_test_summary = f'Cam{camera_id} {s}' + '\n'
872      mobly_scene_output_logs_path = os.path.join(mobly_output_logs_path, s)
873
874      # Since test directories do not have 'folded' in the name, we need
875      # to remove that suffix for the path of the scenes to be loaded
876      # on the tablets
877      testing_scene = s
878      if 'folded' in s:
879        testing_scene = s.split('_folded')[0]
880      test_params_content['scene'] = testing_scene
881      test_params_content['scene_with_suffix'] = s
882
883      if auto_scene_switch:
884        # Copy scene images onto the tablet
885        if 'scene0' not in testing_scene:
886          its_session_utils.copy_scenes_to_tablet(testing_scene, tablet_id)
887      else:
888        # Check manual scenes for correctness
889        if ('scene0' not in testing_scene and
890            'scene_ip' not in testing_scene and
891            not testing_sensor_fusion_with_controller):
892          check_manual_scenes(device_id, camera_id, testing_scene,
893                              mobly_output_logs_path)
894
895      scene_test_list = []
896      config_file_contents['TestBeds'][0]['TestParams'] = test_params_content
897      # Add the MoblyParams to config.yml file with the path to store camera_id
898      # test results. This is a separate dict other than TestBeds.
899      mobly_params_dict = {
900          'MoblyParams': {
901              'LogPath': mobly_scene_output_logs_path
902          }
903      }
904      config_file_contents.update(mobly_params_dict)
905      logging.debug('Final config file contents: %s', config_file_contents)
906      new_yml_file_name = get_updated_yml_file(config_file_contents)
907      logging.info('Using %s as temporary config yml file', new_yml_file_name)
908      if camera_id.rfind(its_session_utils.SUB_CAMERA_SEPARATOR) == -1:
909        scene_dir = os.listdir(
910            os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', testing_scene))
911        for file_name in scene_dir:
912          if file_name.endswith('.py') and 'test' in file_name:
913            scene_test_list.append(file_name)
914      else:  # sub-camera
915        if SUB_CAMERA_TESTS.get(testing_scene):
916          scene_test_list = [f'{test}.py' for test in SUB_CAMERA_TESTS[
917              testing_scene]]
918        else:
919          scene_test_list = []
920      scene_test_list.sort()
921
922      # Run tests for scene
923      logging.info('Running tests for %s with camera %s',
924                   testing_scene, camera_id)
925      num_pass = 0
926      num_skip = 0
927      num_not_mandated_fail = 0
928      num_fail = 0
929      for test in scene_test_list:
930        # Handle repeated test
931        if 'tests/' in test:
932          cmd = [
933              'python3',
934              os.path.join(os.environ['CAMERA_ITS_TOP'], test), '-c',
935              f'{new_yml_file_name}'
936          ]
937        else:
938          cmd = [
939              'python3',
940              os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests',
941                           testing_scene, test),
942              '-c',
943              f'{new_yml_file_name}'
944          ]
945        return_string = ''
946        for num_try in range(NUM_TRIES):
947          # Saves to mobly test summary file
948          # print only messages for manual lighting control testing
949          output = subprocess.Popen(
950              cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
951          )
952          with output.stdout, open(
953              os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE), 'wb'
954          ) as file:
955            for line in iter(output.stdout.readline, b''):
956              out = line.decode('utf-8').strip()
957              if '<ENTER>' in out: print(out)
958              file.write(line)
959          output.wait()
960
961          # Parse mobly logs to determine PASS/FAIL(*)/SKIP & socket FAILs
962          with open(
963              os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE), 'r') as file:
964            test_code = output.returncode
965            test_skipped = False
966            test_not_yet_mandated = False
967            test_mpc_req = ''
968            perf_test_metrics = ''
969            hdr_mpc_req = ''
970            feature_query_proto = ''
971            content = file.read()
972            root_output_path = ''
973
974            # Find media performance class logging
975            lines = content.splitlines()
976            for one_line in lines:
977              # regular expression pattern must match
978              # MPC12_CAMERA_LAUNCH_PATTERN or MPC12_JPEG_CAPTURE_PATTERN in
979              # ItsTestActivity.java.
980              mpc_string_match = re.search(
981                  '^(1080p_jpeg_capture_time_ms:|camera_launch_time_ms:)',
982                  one_line)
983              if mpc_string_match:
984                test_mpc_req = one_line
985                break
986
987            for one_line in lines:
988              # regular expression pattern must match in ItsTestActivity.java.
989              gainmap_string_match = re.search('^has_gainmap:', one_line)
990              if gainmap_string_match:
991                hdr_mpc_req = one_line
992                break
993
994            # Find root output path used to store feature combination query
995            # proto
996            for one_line in lines:
997              if 'root_output_path:' in one_line:
998                root_output_path = one_line.split(':')[1].strip()
999            # Find feature combination query proto
1000            for one_line in lines:
1001              # regular expression pattern must match in ItsTestActivity.java.
1002              feature_comb_query_string_match = re.search(
1003                  '^feature_query_proto:(.*)', one_line
1004              )
1005              if feature_comb_query_string_match:
1006                feature_query_proto = feature_comb_query_string_match.group(1)
1007                break
1008
1009            # Find performance metrics logging
1010            for one_line in lines:
1011              # regular expression pattern must match in ItsTestActivity.java.
1012              perf_metrics_string_match = re.search(
1013                  '^test.*:',
1014                  one_line)
1015              if perf_metrics_string_match:
1016                perf_test_metrics = one_line
1017                # each test can add multiple metrics
1018                results[s][PERFORMANCE_KEY].append(perf_test_metrics)
1019
1020            if 'Test skipped' in content:
1021              return_string = 'SKIP '
1022              num_skip += 1
1023              test_skipped = True
1024              break
1025
1026            if its_session_utils.NOT_YET_MANDATED_MESSAGE in content:
1027              return_string = 'FAIL*'
1028              num_not_mandated_fail += 1
1029              test_not_yet_mandated = True
1030              break
1031
1032            if test_code == 0 and not test_skipped:
1033              return_string = 'PASS '
1034              num_pass += 1
1035              break
1036
1037            if test_code != 0 and not test_not_yet_mandated:
1038              return_string = 'FAIL '
1039              if 'Problem with socket' in content and num_try != NUM_TRIES-1:
1040                logging.info('Retry %s/%s', s, test)
1041              else:
1042                num_fail += 1
1043                break
1044            os.remove(os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE))
1045        status_prefix = ''
1046        if testbed_index is not None:
1047          status_prefix = config_file_test_key + ':'
1048        logging.info('%s%s %s/%s', status_prefix, return_string, s, test)
1049        test_name = test.split('/')[-1].split('.')[0]
1050        results[s]['TEST_STATUS'].append({
1051            'test': test_name,
1052            'status': return_string.strip()})
1053        if test_mpc_req:
1054          results[s][METRICS_KEY].append(test_mpc_req)
1055        if hdr_mpc_req:
1056          results[s][METRICS_KEY].append(hdr_mpc_req)
1057        if feature_query_proto and root_output_path:
1058          results[s][FEATURE_QUERY_KEY].append(feature_query_proto)
1059          results[s][FEATURE_QUERY_PATH_KEY].append(root_output_path)
1060
1061        msg_short = f'{return_string} {test}'
1062        scene_test_summary += msg_short + '\n'
1063        if (test in _LIGHTING_CONTROL_TESTS and
1064            not testing_flash_with_controller):
1065          print('Turn lights ON in rig and press <ENTER> to continue.')
1066
1067      # unit is millisecond for execution time record in CtsVerifier
1068      scene_end_time = int(round(time.time() * 1000))
1069      skip_string = ''
1070      tot_tests = len(scene_test_list)
1071      tot_tests_run = tot_tests - num_skip
1072      if tot_tests_run != 0:
1073        tests_passed_ratio = (num_pass + num_not_mandated_fail) / tot_tests_run
1074      else:
1075        tests_passed_ratio = (num_pass + num_not_mandated_fail) / 100.0
1076      tests_passed_ratio_format = f'{(100 * tests_passed_ratio):.1f}%'
1077      if num_skip > 0:
1078        skip_string = f",{num_skip} test{'s' if num_skip > 1 else ''} skipped"
1079      test_result = (f'{num_pass + num_not_mandated_fail} / {tot_tests_run} '
1080                     f'tests passed ({tests_passed_ratio_format}){skip_string}')
1081      logging.info(test_result)
1082      if num_not_mandated_fail > 0:
1083        logging.info('(*) %s not_yet_mandated tests failed',
1084                     num_not_mandated_fail)
1085
1086      tot_pass += num_pass
1087      logging.info('scene tests: %s, Total tests passed: %s', tot_tests,
1088                   tot_pass)
1089      if tot_tests > 0:
1090        logging.info('%s compatibility score: %.f/100\n',
1091                     s, 100 * num_pass / tot_tests)
1092        scene_test_summary_path = os.path.join(mobly_scene_output_logs_path,
1093                                               'scene_test_summary.txt')
1094        with open(scene_test_summary_path, 'w') as f:
1095          f.write(scene_test_summary)
1096        results[s][RESULT_KEY] = (RESULT_PASS if num_fail == 0 else RESULT_FAIL)
1097        results[s][SUMMARY_KEY] = scene_test_summary_path
1098        results[s][TIME_KEY_START] = scene_start_time
1099        results[s][TIME_KEY_END] = scene_end_time
1100      else:
1101        logging.info('%s compatibility score: 0/100\n')
1102
1103      # Delete temporary yml file after scene run.
1104      new_yaml_file_path = os.path.join(YAML_FILE_DIR, new_yml_file_name)
1105      os.remove(new_yaml_file_path)
1106    # Log results per camera
1107    if num_testbeds is None or testbed_index == _MAIN_TESTBED:
1108      logging.info('Reporting camera %s ITS results to CtsVerifier', camera_id)
1109      logging.info('ITS results to CtsVerifier: %s', results)
1110      report_result(device_id, camera_id, tablet_name, results)
1111    else:
1112      write_result(testbed_index, device_id, camera_id, results)
1113
1114  logging.info('Test execution completed.')
1115
1116  # Power down tablet
1117  if tablet_id:
1118    cmd = f'adb -s {tablet_id} shell input keyevent KEYCODE_POWER'
1119    subprocess.Popen(cmd.split())
1120
1121  # establish connection with lighting controller
1122  lighting_cntl = test_params_content.get('lighting_cntl', 'None')
1123  lighting_ch = test_params_content.get('lighting_ch', 'None')
1124  arduino_serial_port = lighting_control_utils.lighting_control(
1125      lighting_cntl, lighting_ch)
1126
1127  # turn OFF lights
1128  lighting_control_utils.set_lighting_state(
1129      arduino_serial_port, lighting_ch, 'OFF')
1130
1131  if num_testbeds is not None:
1132    if testbed_index == _MAIN_TESTBED:
1133      logging.info('Waiting for all testbeds to finish.')
1134      start = time.time()
1135      completed_testbeds = set()
1136      while time.time() < start + MERGE_RESULTS_TIMEOUT:
1137        for i in range(num_testbeds):
1138          if os.path.isfile(f'testbed_{i}_completed.tmp'):
1139            start = time.time()
1140            completed_testbeds.add(i)
1141        # Already reported _MAIN_TESTBED's results.
1142        if len(completed_testbeds) == num_testbeds - 1:
1143          logging.info('All testbeds completed, merging results.')
1144          for parsed_id, parsed_camera, parsed_results in (
1145              parse_testbeds(completed_testbeds)):
1146            logging.debug('Parsed id: %s, parsed cam: %s, parsed results: %s',
1147                          parsed_id, parsed_camera, parsed_results)
1148            if not are_devices_similar(device_id, parsed_id):
1149              logging.error('Device %s and device %s are not the same '
1150                            'model/type/build/revision.',
1151                            device_id, parsed_id)
1152              return
1153            report_result(device_id, parsed_camera, tablet_name, parsed_results)
1154          for temp_file in glob.glob('testbed_*.tmp'):
1155            os.remove(temp_file)
1156          break
1157      else:
1158        logging.error('No testbeds finished in the last %d seconds, '
1159                      'but still expected data. '
1160                      'Completed testbed indices: %s, '
1161                      'expected number of testbeds: %d',
1162                      MERGE_RESULTS_TIMEOUT, list(completed_testbeds),
1163                      num_testbeds)
1164    else:
1165      with open(f'testbed_{testbed_index}_completed.tmp', 'w') as _:
1166        pass
1167
1168if __name__ == '__main__':
1169  main()
1170