• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2014 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import json
16import glob
17import logging
18import os
19import os.path
20import re
21import subprocess
22import sys
23import tempfile
24import time
25
26import camera_properties_utils
27import capture_request_utils
28import image_processing_utils
29import its_session_utils
30import numpy as np
31import yaml
32
33YAML_FILE_DIR = os.environ['CAMERA_ITS_TOP']
34CONFIG_FILE = os.path.join(YAML_FILE_DIR, 'config.yml')
35TEST_KEY_TABLET = 'tablet'
36TEST_KEY_SENSOR_FUSION = 'sensor_fusion'
37LOAD_SCENE_DELAY = 1  # seconds
38ACTIVITY_START_WAIT = 1.5  # seconds
39MERGE_RESULTS_TIMEOUT = 3600  # seconds
40
41NUM_TRIES = 2
42RESULT_PASS = 'PASS'
43RESULT_FAIL = 'FAIL'
44RESULT_NOT_EXECUTED = 'NOT_EXECUTED'
45RESULT_KEY = 'result'
46METRICS_KEY = 'mpc_metrics'
47SUMMARY_KEY = 'summary'
48RESULT_VALUES = {RESULT_PASS, RESULT_FAIL, RESULT_NOT_EXECUTED}
49CTS_VERIFIER_PACKAGE_NAME = 'com.android.cts.verifier'
50ITS_TEST_ACTIVITY = 'com.android.cts.verifier/.camera.its.ItsTestActivity'
51ACTION_ITS_RESULT = 'com.android.cts.verifier.camera.its.ACTION_ITS_RESULT'
52EXTRA_VERSION = 'camera.its.extra.VERSION'
53CURRENT_ITS_VERSION = '1.0'  # version number to sync with CtsVerifier
54EXTRA_CAMERA_ID = 'camera.its.extra.CAMERA_ID'
55EXTRA_RESULTS = 'camera.its.extra.RESULTS'
56TIME_KEY_START = 'start'
57TIME_KEY_END = 'end'
58VALID_CONTROLLERS = ('arduino', 'canakit')
59_INT_STR_DICT = {'11': '1_1', '12': '1_2'}  # recover replaced '_' in scene def
60_FRONT_CAMERA_ID = '1'
61_PROPERTIES_TO_MATCH = (
62    'ro.product.model', 'ro.product.name', 'ro.build.display.id', 'ro.revision'
63)
64_MAIN_TESTBED = 0
65
66# All possible scenes
67# Notes on scene names:
68#   scene*_1/2/... are same scene split to load balance run times for scenes
69#   scene*_a/b/... are similar scenes that share one or more tests
70_ALL_SCENES = [
71    'scene0', 'scene1_1', 'scene1_2', 'scene2_a', 'scene2_b', 'scene2_c',
72    'scene2_d', 'scene2_e', 'scene2_f', 'scene3', 'scene4', 'scene5',
73    'scene6', os.path.join('scene_extensions', 'scene_hdr'),
74    os.path.join('scene_extensions', 'scene_night'), 'sensor_fusion'
75]
76
77# Scenes that can be automated through tablet display
78_AUTO_SCENES = [
79    'scene0', 'scene1_1', 'scene1_2', 'scene2_a', 'scene2_b', 'scene2_c',
80    'scene2_d', 'scene2_e', 'scene2_f', 'scene3', 'scene4', 'scene6',
81    os.path.join('scene_extensions', 'scene_hdr'),
82    os.path.join('scene_extensions', 'scene_night')
83]
84
85# Scenes that are logically grouped and can be called as group
86_GROUPED_SCENES = {
87        'scene1': ['scene1_1', 'scene1_2'],
88        'scene2': ['scene2_a', 'scene2_b', 'scene2_c', 'scene2_d', 'scene2_e',
89                   'scene2_f']
90}
91
92# Scenes that have to be run manually regardless of configuration
93_MANUAL_SCENES = ['scene5']
94
95# Scene requirements for manual testing.
96_SCENE_REQ = {
97    'scene0': None,
98    'scene1_1': 'A grey card covering at least the middle 30% of the scene',
99    'scene1_2': 'A grey card covering at least the middle 30% of the scene',
100    'scene2_a': 'The picture with 3 faces in tests/scene2_a/scene2_a.png',
101    'scene2_b': 'The picture with 3 faces in tests/scene2_b/scene2_b.png',
102    'scene2_c': 'The picture with 3 faces in tests/scene2_c/scene2_c.png',
103    'scene2_d': 'The picture with 3 faces in tests/scene2_d/scene2_d.png',
104    'scene2_e': 'The picture with 3 faces in tests/scene2_e/scene2_e.png',
105    'scene2_f': 'The picture with 3 faces in tests/scene2_f/scene2_f.png',
106    'scene3': 'The ISO12233 chart',
107    'scene4': 'A test chart of a circle covering at least the middle 50% of '
108              'the scene. See tests/scene4/scene4.png',
109    'scene5': 'Capture images with a diffuser attached to the camera. '
110              'See source.android.com/docs/compatibility/cts/camera-its-tests#scene5/diffuser '
111              'for more details',
112    'scene6': 'A grid of black circles on a white background. '
113              'See tests/scene6/scene6.png',
114    # Use os.path to avoid confusion on other platforms
115    os.path.join('scene_extensions', 'scene_hdr'): (
116        'A tablet displayed scene with a face on the left '
117        'and a low-contrast QR code on the right. '
118        'See tests/scene_extensions/scene_hdr/scene_hdr.png'
119    ),
120    os.path.join('scene_extensions', 'scene_night'): (
121        'A tablet displayed scene with a white circle '
122        'and four smaller circles inside of it. '
123        'See tests/scene_extensions/scene_night/scene_night.png'
124    ),
125    'sensor_fusion': 'A checkerboard pattern for phone to rotate in front of '
126                     'in tests/sensor_fusion/checkerboard.pdf\n'
127                     'See tests/sensor_fusion/SensorFusion.pdf for detailed '
128                     'instructions.\nNote that this test will be skipped '
129                     'on devices not supporting REALTIME camera timestamp.',
130}
131
132
133SUB_CAMERA_TESTS = {
134    'scene0': [
135        'test_burst_capture',
136        'test_jitter',
137        'test_metadata',
138        'test_read_write',
139        'test_sensor_events',
140        'test_solid_color_test_pattern',
141        'test_unified_timestamps',
142    ],
143    'scene1_1': [
144        'test_burst_sameness_manual',
145        'test_dng_noise_model',
146        'test_exposure',
147        'test_linearity',
148    ],
149    'scene1_2': [
150        'test_raw_exposure',
151        'test_raw_sensitivity',
152        'test_yuv_plus_raw',
153    ],
154    'scene2_a': [
155        'test_num_faces',
156    ],
157    'scene4': [
158        'test_aspect_ratio_and_crop',
159    ],
160    'sensor_fusion': [
161        'test_sensor_fusion',
162    ],
163}
164
165_LIGHTING_CONTROL_TESTS = [
166    'test_auto_flash.py',
167    'test_preview_min_frame_rate.py',
168    'test_led_snapshot.py',
169    'test_night_extension.py',
170    'test_hdr_extension.py',
171    ]
172
173_DST_SCENE_DIR = '/sdcard/Download/'
174MOBLY_TEST_SUMMARY_TXT_FILE = 'test_mobly_summary.txt'
175
176
177def run(cmd):
178  """Replaces os.system call, while hiding stdout+stderr messages."""
179  with open(os.devnull, 'wb') as devnull:
180    subprocess.check_call(cmd.split(), stdout=devnull, stderr=subprocess.STDOUT)
181
182
183def check_cts_apk_installed(device_id):
184  """Verifies that CtsVerifer.apk is installed on a given device."""
185  verify_cts_cmd = f'adb -s {device_id} shell pm list packages | grep {CTS_VERIFIER_PACKAGE_NAME}'
186  raw_output = subprocess.check_output(
187      verify_cts_cmd, stderr=subprocess.STDOUT, shell=True
188  )
189  output = str(raw_output.decode('utf-8')).strip()
190  if CTS_VERIFIER_PACKAGE_NAME not in output:
191    raise AssertionError(
192        f"{CTS_VERIFIER_PACKAGE_NAME} was not found in {device_id}'s list of packages!"
193    )
194
195
196def report_result(device_id, camera_id, results):
197  """Sends a pass/fail result to the device, via an intent.
198
199  Args:
200   device_id: The ID string of the device to report the results to.
201   camera_id: The ID string of the camera for which to report pass/fail.
202   results: a dictionary contains all ITS scenes as key and result/summary of
203            current ITS run. See test_report_result unit test for an example.
204  """
205  adb = f'adb -s {device_id}'
206  initialization_cmds = (
207      f'{adb} shell input keyevent KEYCODE_WAKEUP',
208      f'{adb} shell input keyevent KEYCODE_MENU',
209      (f'{adb} shell am start -n {ITS_TEST_ACTIVITY} '
210       '--activity-brought-to-front')
211  )
212  # Awaken if necessary and start ItsTestActivity to receive test results
213  for cmd in initialization_cmds:
214    run(cmd)
215  time.sleep(ACTIVITY_START_WAIT)
216
217  # Validate/process results argument
218  for scene in results:
219    if RESULT_KEY not in results[scene]:
220      raise ValueError(f'ITS result not found for {scene}')
221    if results[scene][RESULT_KEY] not in RESULT_VALUES:
222      raise ValueError(f'Unknown ITS result for {scene}: {results[RESULT_KEY]}')
223    if SUMMARY_KEY in results[scene]:
224      device_summary_path = f'/sdcard/its_camera{camera_id}_{scene}.txt'
225      run('%s push %s %s' %
226          (adb, results[scene][SUMMARY_KEY], device_summary_path))
227      results[scene][SUMMARY_KEY] = device_summary_path
228
229  json_results = json.dumps(results)
230  cmd = (f"{adb} shell am broadcast -a {ACTION_ITS_RESULT} --es {EXTRA_VERSION}"
231         f" {CURRENT_ITS_VERSION} --es {EXTRA_CAMERA_ID} {camera_id} --es "
232         f"{EXTRA_RESULTS} \'{json_results}\'")
233  if len(cmd) > 8000:
234    logging.info('ITS command string might be too long! len:%s', len(cmd))
235  run(cmd)
236
237
238def write_result(testbed_index, device_id, camera_id, results):
239  """Writes results to a temporary file for merging.
240
241  Args:
242    testbed_index: the index of a finished testbed.
243    device_id: the ID string of the device that created results.
244    camera_id: the ID string of the camera of the device.
245    results: a dictionary that contains all ITS scenes as key
246             and result/summary of current ITS run.
247  """
248  result = {'device_id': device_id, 'results': results}
249  file_name = f'testbed_{testbed_index}_camera_{camera_id}.tmp'
250  with open(file_name, 'w') as f:
251    json.dump(result, f)
252
253
254def parse_testbeds(completed_testbeds):
255  """Parses completed testbeds and yields device_id, camera_id, and results.
256
257  Args:
258    completed_testbeds: an iterable of completed testbed indices.
259  Yields:
260    device_id: the device associated with the testbed.
261    camera_id: one of the camera_ids associated with the testbed.
262    results: the dictionary with scenes and result/summary of testbed's run.
263  """
264  for i in completed_testbeds:
265    for file_name in glob.glob(f'testbed_{i}_camera_*.tmp'):
266      camera_id = file_name.split('camera_')[1].split('.tmp')[0]
267      device_id = ''
268      results = {}
269      with open(file_name, 'r') as f:
270        testbed_data = json.load(f)
271        device_id = testbed_data['device_id']
272        results = testbed_data['results']
273      if not device_id or not results:
274        raise ValueError(f'device_id or results for {file_name} not found.')
275      yield device_id, camera_id, results
276
277
278def get_device_property(device_id, property_name):
279  """Get property of a given device.
280
281  Args:
282    device_id: the ID string of a device.
283    property_name: the desired property string.
284  Returns:
285    The value of the property.
286  """
287  property_cmd = f'adb -s {device_id} shell getprop {property_name}'
288  raw_output = subprocess.check_output(
289      property_cmd, stderr=subprocess.STDOUT, shell=True)
290  return str(raw_output.decode('utf-8')).strip()
291
292
293def are_devices_similar(device_id_1, device_id_2):
294  """Checks if key dimensions are the same between devices.
295
296  Args:
297    device_id_1: the ID string of the _MAIN_TESTBED device.
298    device_id_2: the ID string of another device.
299  Returns:
300    True if both devices share key dimensions.
301  """
302  for property_to_match in _PROPERTIES_TO_MATCH:
303    property_value_1 = get_device_property(device_id_1, property_to_match)
304    property_value_2 = get_device_property(device_id_2, property_to_match)
305    if property_value_1 != property_value_2:
306      logging.error('%s does not match %s for %s',
307                    property_value_1, property_value_2, property_to_match)
308      return False
309  return True
310
311
312def load_scenes_on_tablet(scene, tablet_id):
313  """Copies scenes onto the tablet before running the tests.
314
315  Args:
316    scene: Name of the scene to copy image files.
317    tablet_id: adb id of tablet
318  """
319  logging.info('Copying files to tablet: %s', tablet_id)
320  scene_dir = os.listdir(
321      os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', scene))
322  for file_name in scene_dir:
323    if file_name.endswith('.png'):
324      src_scene_file = os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests',
325                                    scene, file_name)
326      cmd = f'adb -s {tablet_id} push {src_scene_file} {_DST_SCENE_DIR}'
327      subprocess.Popen(cmd.split())
328  time.sleep(LOAD_SCENE_DELAY)
329  logging.info('Finished copying files to tablet.')
330
331
332def check_manual_scenes(device_id, camera_id, scene, out_path):
333  """Halt run to change scenes.
334
335  Args:
336    device_id: id of device
337    camera_id: id of camera
338    scene: Name of the scene to copy image files.
339    out_path: output file location
340  """
341  with its_session_utils.ItsSession(
342      device_id=device_id,
343      camera_id=camera_id) as cam:
344    props = cam.get_camera_properties()
345    props = cam.override_with_hidden_physical_camera_props(props)
346
347    while True:
348      input(f'\n Press <ENTER> after positioning camera {camera_id} with '
349            f'{scene}.\n The scene setup should be: \n  {_SCENE_REQ[scene]}\n')
350      # Converge 3A prior to capture
351      if scene == 'scene5':
352        cam.do_3a(do_af=False, lock_ae=camera_properties_utils.ae_lock(props),
353                  lock_awb=camera_properties_utils.awb_lock(props))
354      else:
355        cam.do_3a()
356      req, fmt = capture_request_utils.get_fastest_auto_capture_settings(props)
357      logging.info('Capturing an image to check the test scene')
358      cap = cam.do_capture(req, fmt)
359      img = image_processing_utils.convert_capture_to_rgb_image(cap)
360      img_name = os.path.join(out_path, f'test_{scene}.jpg')
361      logging.info('Please check scene setup in %s', img_name)
362      image_processing_utils.write_image(img, img_name)
363      choice = input(f'Is the image okay for ITS {scene}? (Y/N)').lower()
364      if choice == 'y':
365        break
366
367
368def get_config_file_contents():
369  """Read the config file contents from a YML file.
370
371  Args:
372    None
373
374  Returns:
375    config_file_contents: a dict read from config.yml
376  """
377  with open(CONFIG_FILE) as file:
378    config_file_contents = yaml.safe_load(file)
379  return config_file_contents
380
381
382def get_test_params(config_file_contents):
383  """Reads the config file parameters.
384
385  Args:
386    config_file_contents: dict read from config.yml file
387
388  Returns:
389    dict of test parameters
390  """
391  test_params = None
392  for _, j in config_file_contents.items():
393    for datadict in j:
394      test_params = datadict.get('TestParams')
395  return test_params
396
397
398def get_device_serial_number(device, config_file_contents):
399  """Returns the serial number of the device with label from the config file.
400
401  The config file contains TestBeds dictionary which contains Controllers and
402  Android Device dicts.The two devices used by the test per box are listed
403  here labels dut and tablet. Parse through the nested TestBeds dict to get
404  the Android device details.
405
406  Args:
407    device: String device label as specified in config file.dut/tablet
408    config_file_contents: dict read from config.yml file
409  """
410
411  for _, j in config_file_contents.items():
412    for datadict in j:
413      android_device_contents = datadict.get('Controllers')
414      for device_dict in android_device_contents.get('AndroidDevice'):
415        for _, label in device_dict.items():
416          if label == 'tablet':
417            tablet_device_id = str(device_dict.get('serial'))
418          if label == 'dut':
419            dut_device_id = str(device_dict.get('serial'))
420  if device == 'tablet':
421    return tablet_device_id
422  else:
423    return dut_device_id
424
425
426def get_updated_yml_file(yml_file_contents):
427  """Create a new yml file and write the testbed contents in it.
428
429  This testbed file is per box and contains all the parameters and
430  device id used by the mobly tests.
431
432  Args:
433   yml_file_contents: Data to write in yml file.
434
435  Returns:
436    Updated yml file contents.
437  """
438  os.chmod(YAML_FILE_DIR, 0o755)
439  file_descriptor, new_yaml_file = tempfile.mkstemp(
440      suffix='.yml', prefix='config_', dir=YAML_FILE_DIR)
441  os.close(file_descriptor)
442  with open(new_yaml_file, 'w') as f:
443    yaml.dump(yml_file_contents, stream=f, default_flow_style=False)
444  new_yaml_file_name = os.path.basename(new_yaml_file)
445  return new_yaml_file_name
446
447
448def enable_external_storage(device_id):
449  """Override apk mode to allow write to external storage.
450
451  Args:
452    device_id: Serial number of the device.
453
454  """
455  cmd = (f'adb -s {device_id} shell appops '
456         'set com.android.cts.verifier MANAGE_EXTERNAL_STORAGE allow')
457  run(cmd)
458
459
460def get_available_cameras(device_id, camera_id):
461  """Get available camera devices in the current state.
462
463  Args:
464    device_id: Serial number of the device.
465    camera_id: Logical camera_id
466
467  Returns:
468    List of all the available camera_ids.
469  """
470  with its_session_utils.ItsSession(
471      device_id=device_id,
472      camera_id=camera_id) as cam:
473    props = cam.get_camera_properties()
474    props = cam.override_with_hidden_physical_camera_props(props)
475    unavailable_physical_cameras = cam.get_unavailable_physical_cameras(
476        camera_id)
477    unavailable_physical_ids = unavailable_physical_cameras[
478        'unavailablePhysicalCamerasArray']
479    output = cam.get_camera_ids()
480    all_camera_ids = output['cameraIdArray']
481    # Concat camera_id, physical camera_id and sub camera separator
482    unavailable_physical_ids = [f'{camera_id}.{s}'
483                                for s in unavailable_physical_ids]
484    for i in unavailable_physical_ids:
485      if i in all_camera_ids:
486        all_camera_ids.remove(i)
487    logging.debug('available camera ids: %s', all_camera_ids)
488  return all_camera_ids
489
490
491def get_unavailable_physical_cameras(device_id, camera_id):
492  """Get unavailable physical cameras in the current state.
493
494  Args:
495    device_id: Serial number of the device.
496    camera_id: Logical camera device id
497
498  Returns:
499    List of all the unavailable camera_ids.
500  """
501  with its_session_utils.ItsSession(
502      device_id=device_id,
503      camera_id=camera_id) as cam:
504    unavailable_physical_cameras = cam.get_unavailable_physical_cameras(
505        camera_id)
506    unavailable_physical_ids = unavailable_physical_cameras[
507        'unavailablePhysicalCamerasArray']
508    unavailable_physical_ids = [f'{camera_id}.{s}'
509                                for s in unavailable_physical_ids]
510    logging.debug('Unavailable physical camera ids: %s',
511                  unavailable_physical_ids)
512  return unavailable_physical_ids
513
514
515def is_device_folded(device_id):
516  """Returns True if the foldable device is in folded state.
517
518  Args:
519    device_id: Serial number of the foldable device.
520  """
521  cmd = (f'adb -s {device_id} shell cmd device_state state')
522  result = subprocess.getoutput(cmd)
523  if 'CLOSE' in result:
524    return True
525  return False
526
527
528def main():
529  """Run all the Camera ITS automated tests.
530
531    Script should be run from the top-level CameraITS directory.
532
533    Command line arguments:
534        camera:  the camera(s) to be tested. Use comma to separate multiple
535                 camera Ids. Ex: "camera=0,1" or "camera=1"
536        scenes:  the test scene(s) to be executed. Use comma to separate
537                 multiple scenes. Ex: "scenes=scene0,scene1_1" or
538                 "scenes=0,1_1,sensor_fusion" (sceneX can be abbreviated by X
539                 where X is scene name minus 'scene')
540  """
541  logging.basicConfig(level=logging.INFO)
542  # Make output directories to hold the generated files.
543  topdir = tempfile.mkdtemp(prefix='CameraITS_')
544  try:
545    subprocess.call(['chmod', 'g+rx', topdir])
546  except OSError as e:
547    logging.info(repr(e))
548
549  scenes = []
550  camera_id_combos = []
551  testbed_index = None
552  num_testbeds = None
553  # Override camera, scenes and testbed with cmd line values if available
554  for s in list(sys.argv[1:]):
555    if 'scenes=' in s:
556      scenes = s.split('=')[1].split(',')
557    elif 'camera=' in s:
558      camera_id_combos = s.split('=')[1].split(',')
559    elif 'testbed_index=' in s:
560      testbed_index = int(s.split('=')[1])
561    elif 'num_testbeds=' in s:
562      num_testbeds = int(s.split('=')[1])
563    else:
564      raise ValueError(f'Unknown argument {s}')
565  if testbed_index is None and num_testbeds is not None:
566    raise ValueError(
567        'testbed_index must be specified if num_testbeds is specified.')
568  if (testbed_index is not None and num_testbeds is not None and
569      testbed_index >= num_testbeds):
570    raise ValueError('testbed_index must be less than num_testbeds. '
571                     'testbed_index starts at 0.')
572
573  # Read config file and extract relevant TestBed
574  config_file_contents = get_config_file_contents()
575  if testbed_index is None:
576    for i in config_file_contents['TestBeds']:
577      if scenes == ['sensor_fusion']:
578        if TEST_KEY_SENSOR_FUSION not in i['Name'].lower():
579          config_file_contents['TestBeds'].remove(i)
580      else:
581        if TEST_KEY_SENSOR_FUSION in i['Name'].lower():
582          config_file_contents['TestBeds'].remove(i)
583  else:
584    config_file_contents = {
585        'TestBeds': [config_file_contents['TestBeds'][testbed_index]]
586    }
587
588  # Get test parameters from config file
589  test_params_content = get_test_params(config_file_contents)
590  if not camera_id_combos:
591    camera_id_combos = str(test_params_content['camera']).split(',')
592  if not scenes:
593    scenes = str(test_params_content['scene']).split(',')
594    scenes = [_INT_STR_DICT.get(n, n) for n in scenes]  # recover '1_1' & '1_2'
595
596  device_id = get_device_serial_number('dut', config_file_contents)
597  # Enable external storage on DUT to send summary report to CtsVerifier.apk
598  enable_external_storage(device_id)
599
600  # Verify that CTS Verifier is installed
601  check_cts_apk_installed(device_id)
602  # Check whether the dut is foldable or not
603  testing_foldable_device = True if test_params_content[
604      'foldable_device'] == 'True' else False
605  available_camera_ids_to_test_foldable = []
606  if testing_foldable_device:
607    logging.debug('Testing foldable device.')
608    # Check the state of foldable device. True if device is folded,
609    # false if the device is opened.
610    device_folded = is_device_folded(device_id)
611    # list of available camera_ids to be tested in device state
612    available_camera_ids_to_test_foldable = get_available_cameras(
613        device_id, _FRONT_CAMERA_ID)
614
615  config_file_test_key = config_file_contents['TestBeds'][0]['Name'].lower()
616  logging.info('Saving %s output files to: %s', config_file_test_key, topdir)
617  if TEST_KEY_TABLET in config_file_test_key:
618    tablet_id = get_device_serial_number('tablet', config_file_contents)
619    tablet_name_cmd = f'adb -s {tablet_id} shell getprop ro.build.product'
620    raw_output = subprocess.check_output(
621        tablet_name_cmd, stderr=subprocess.STDOUT, shell=True)
622    tablet_name = str(raw_output.decode('utf-8')).strip()
623    logging.debug('Tablet name: %s', tablet_name)
624    brightness = test_params_content['brightness']
625    its_session_utils.validate_tablet_brightness(tablet_name, brightness)
626  else:
627    tablet_id = None
628
629  testing_sensor_fusion_with_controller = False
630  if TEST_KEY_SENSOR_FUSION in config_file_test_key:
631    if test_params_content['rotator_cntl'].lower() in VALID_CONTROLLERS:
632      testing_sensor_fusion_with_controller = True
633
634  testing_flash_with_controller = False
635  if (TEST_KEY_TABLET in config_file_test_key or
636      'manual' in config_file_test_key):
637    if test_params_content.get('lighting_cntl', 'None').lower() == 'arduino':
638      testing_flash_with_controller = True
639
640  # Prepend 'scene' if not specified at cmd line
641  for i, s in enumerate(scenes):
642    if (not s.startswith('scene') and
643        not s.startswith(('sensor_fusion', '<scene-name>'))):
644      scenes[i] = f'scene{s}'
645
646  # Expand GROUPED_SCENES and remove any duplicates
647  scenes = [_GROUPED_SCENES[s] if s in _GROUPED_SCENES else s for s in scenes]
648  scenes = np.hstack(scenes).tolist()
649  scenes = sorted(set(scenes), key=scenes.index)
650  # List of scenes to be executed in folded state will have '_folded'
651  # prefix. This will help distinguish the test results from folded vs
652  # open device state for front camera_ids.
653  folded_device_scenes = []
654  for scene in scenes:
655    folded_device_scenes.append(f'{scene}_folded')
656
657  logging.info('Running ITS on device: %s, camera(s): %s, scene(s): %s',
658               device_id, camera_id_combos, scenes)
659
660  # Determine if manual run
661  if tablet_id is not None and not set(scenes).intersection(_MANUAL_SCENES):
662    auto_scene_switch = True
663  else:
664    auto_scene_switch = False
665    logging.info('No tablet: manual, sensor_fusion, or scene5 testing.')
666
667  folded_prompted = False
668  opened_prompted = False
669  for camera_id in camera_id_combos:
670    test_params_content['camera'] = camera_id
671    results = {}
672    unav_cameras = []
673    # Get the list of unavailable cameras in current device state.
674    # These camera_ids should not be tested in current device state.
675    if testing_foldable_device:
676      unav_cameras = get_unavailable_physical_cameras(
677          device_id, _FRONT_CAMERA_ID)
678
679    if testing_foldable_device:
680      device_state = 'folded' if device_folded else 'opened'
681
682    testing_folded_front_camera = (testing_foldable_device and
683                                   device_folded and
684                                   _FRONT_CAMERA_ID in camera_id)
685
686    # Raise an assertion error if there is any camera unavailable in
687    # current device state. Usually scenes with suffix 'folded' will
688    # be executed in folded state.
689    if (testing_foldable_device and
690        _FRONT_CAMERA_ID in camera_id and camera_id in unav_cameras):
691      raise AssertionError(
692          f'Camera {camera_id} is unavailable in device state {device_state}'
693          f' and cannot be tested with device {device_state}!')
694
695    if (testing_folded_front_camera and camera_id not in unav_cameras
696        and not folded_prompted):
697      folded_prompted = True
698      input('\nYou are testing a foldable device in folded state. '
699            'Please make sure the device is folded and press <ENTER> '
700            'after positioning properly.\n')
701
702    if (testing_foldable_device and
703        not device_folded and _FRONT_CAMERA_ID in camera_id and
704        camera_id not in unav_cameras and not opened_prompted):
705      opened_prompted = True
706      input('\nYou are testing a foldable device in opened state. '
707            'Please make sure the device is unfolded and press <ENTER> '
708            'after positioning properly.\n')
709
710    # Run through all scenes if user does not supply one and config file doesn't
711    # have specific scene name listed.
712    if its_session_utils.SUB_CAMERA_SEPARATOR in camera_id:
713      possible_scenes = list(SUB_CAMERA_TESTS.keys())
714      if auto_scene_switch:
715        possible_scenes.remove('sensor_fusion')
716    else:
717      possible_scenes = _AUTO_SCENES if auto_scene_switch else _ALL_SCENES
718
719    if '<scene-name>' in scenes:
720      per_camera_scenes = possible_scenes
721    else:
722      # Validate user input scene names
723      per_camera_scenes = []
724      for s in scenes:
725        if s in possible_scenes:
726          per_camera_scenes.append(s)
727      if not per_camera_scenes:
728        raise ValueError('No valid scene specified for this camera.')
729
730    # Folded state scenes will have 'folded' suffix only for
731    # front cameras since rear cameras are common in both folded
732    # and unfolded state.
733    foldable_per_camera_scenes = []
734    if testing_folded_front_camera:
735      if camera_id not in available_camera_ids_to_test_foldable:
736        raise AssertionError(f'camera {camera_id} is not available.')
737      for s in per_camera_scenes:
738        foldable_per_camera_scenes.append(f'{s}_folded')
739
740    if foldable_per_camera_scenes:
741      per_camera_scenes = foldable_per_camera_scenes
742
743    logging.info('camera: %s, scene(s): %s', camera_id, per_camera_scenes)
744
745    if testing_folded_front_camera:
746      all_scenes = [f'{s}_folded' for s in _ALL_SCENES]
747    else:
748      all_scenes = _ALL_SCENES
749
750    for s in all_scenes:
751      results[s] = {RESULT_KEY: RESULT_NOT_EXECUTED}
752
753      # assert device folded testing scenes with suffix 'folded'
754      if testing_foldable_device and 'folded' in s:
755        if not device_folded:
756          raise AssertionError('Device should be folded during'
757                               ' testing scenes with suffix "folded"')
758
759    # A subdir in topdir will be created for each camera_id. All scene test
760    # output logs for each camera id will be stored in this subdir.
761    # This output log path is a mobly param : LogPath
762    cam_id_string = f"cam_id_{camera_id.replace(its_session_utils.SUB_CAMERA_SEPARATOR, '_')}"
763    mobly_output_logs_path = os.path.join(topdir, cam_id_string)
764    os.mkdir(mobly_output_logs_path)
765    tot_pass = 0
766    for s in per_camera_scenes:
767      results[s]['TEST_STATUS'] = []
768      results[s][METRICS_KEY] = []
769
770      # unit is millisecond for execution time record in CtsVerifier
771      scene_start_time = int(round(time.time() * 1000))
772      scene_test_summary = f'Cam{camera_id} {s}' + '\n'
773      mobly_scene_output_logs_path = os.path.join(mobly_output_logs_path, s)
774
775      # Since test directories do not have 'folded' in the name, we need
776      # to remove that suffix for the path of the scenes to be loaded
777      # on the tablets
778      testing_scene = s
779      if 'folded' in s:
780        testing_scene = s.split('_folded')[0]
781      test_params_content['scene'] = testing_scene
782      test_params_content['scene_with_suffix'] = s
783
784      if auto_scene_switch:
785        # Copy scene images onto the tablet
786        if 'scene0' not in testing_scene:
787          load_scenes_on_tablet(testing_scene, tablet_id)
788      else:
789        # Check manual scenes for correctness
790        if 'scene0' not in testing_scene and not testing_sensor_fusion_with_controller:
791          check_manual_scenes(device_id, camera_id, testing_scene,
792                              mobly_output_logs_path)
793
794      scene_test_list = []
795      config_file_contents['TestBeds'][0]['TestParams'] = test_params_content
796      # Add the MoblyParams to config.yml file with the path to store camera_id
797      # test results. This is a separate dict other than TestBeds.
798      mobly_params_dict = {
799          'MoblyParams': {
800              'LogPath': mobly_scene_output_logs_path
801          }
802      }
803      config_file_contents.update(mobly_params_dict)
804      logging.debug('Final config file contents: %s', config_file_contents)
805      new_yml_file_name = get_updated_yml_file(config_file_contents)
806      logging.info('Using %s as temporary config yml file', new_yml_file_name)
807      if camera_id.rfind(its_session_utils.SUB_CAMERA_SEPARATOR) == -1:
808        scene_dir = os.listdir(
809            os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', testing_scene))
810        for file_name in scene_dir:
811          if file_name.endswith('.py') and 'test' in file_name:
812            scene_test_list.append(file_name)
813      else:  # sub-camera
814        if SUB_CAMERA_TESTS.get(testing_scene):
815          scene_test_list = [f'{test}.py' for test in SUB_CAMERA_TESTS[
816              testing_scene]]
817        else:
818          scene_test_list = []
819      scene_test_list.sort()
820
821      # Run tests for scene
822      logging.info('Running tests for %s with camera %s',
823                   testing_scene, camera_id)
824      num_pass = 0
825      num_skip = 0
826      num_not_mandated_fail = 0
827      num_fail = 0
828      for test in scene_test_list:
829        # Handle repeated test
830        if 'tests/' in test:
831          cmd = [
832              'python3',
833              os.path.join(os.environ['CAMERA_ITS_TOP'], test), '-c',
834              f'{new_yml_file_name}'
835          ]
836        else:
837          cmd = [
838              'python3',
839              os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests',
840                           testing_scene, test),
841              '-c',
842              f'{new_yml_file_name}'
843          ]
844        for num_try in range(NUM_TRIES):
845          # Handle manual lighting control redirected stdout in test
846          if (test in _LIGHTING_CONTROL_TESTS and
847              not testing_flash_with_controller):
848            print('Turn lights OFF in rig and press <ENTER> to continue.')
849
850          # pylint: disable=subprocess-run-check
851          with open(
852              os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE), 'w') as fp:
853            output = subprocess.run(cmd, stdout=fp)
854          # pylint: enable=subprocess-run-check
855
856          # Parse mobly logs to determine SKIP, NOT_YET_MANDATED, and
857          # socket FAILs.
858          with open(
859              os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE), 'r') as file:
860            test_code = output.returncode
861            test_skipped = False
862            test_not_yet_mandated = False
863            test_mpc_req = ''
864            content = file.read()
865
866            # Find media performance class logging
867            lines = content.splitlines()
868            for one_line in lines:
869              # regular expression pattern must match
870              # MPC12_CAMERA_LAUNCH_PATTERN or MPC12_JPEG_CAPTURE_PATTERN in
871              # ItsTestActivity.java.
872              mpc_string_match = re.search(
873                  '^(1080p_jpeg_capture_time_ms:|camera_launch_time_ms:)',
874                  one_line)
875              if mpc_string_match:
876                test_mpc_req = one_line
877                break
878
879            if 'Test skipped' in content:
880              return_string = 'SKIP '
881              num_skip += 1
882              test_skipped = True
883              break
884
885            if 'Not yet mandated test' in content:
886              return_string = 'FAIL*'
887              num_not_mandated_fail += 1
888              test_not_yet_mandated = True
889              break
890
891            if test_code == 0 and not test_skipped:
892              return_string = 'PASS '
893              num_pass += 1
894              break
895
896            if test_code == 1 and not test_not_yet_mandated:
897              return_string = 'FAIL '
898              if 'Problem with socket' in content and num_try != NUM_TRIES-1:
899                logging.info('Retry %s/%s', s, test)
900              else:
901                num_fail += 1
902                break
903            os.remove(os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE))
904        status_prefix = ''
905        if testbed_index is not None:
906          status_prefix = config_file_test_key + ':'
907        logging.info('%s%s %s/%s', status_prefix, return_string, s, test)
908        test_name = test.split('/')[-1].split('.')[0]
909        results[s]['TEST_STATUS'].append({
910            'test': test_name,
911            'status': return_string.strip()})
912        if test_mpc_req:
913          results[s][METRICS_KEY].append(test_mpc_req)
914        msg_short = f'{return_string} {test}'
915        scene_test_summary += msg_short + '\n'
916        if test in _LIGHTING_CONTROL_TESTS and not testing_flash_with_controller:
917          print('Turn lights ON in rig and press <ENTER> to continue.')
918
919      # unit is millisecond for execution time record in CtsVerifier
920      scene_end_time = int(round(time.time() * 1000))
921      skip_string = ''
922      tot_tests = len(scene_test_list)
923      tot_tests_run = tot_tests - num_skip
924      if tot_tests_run != 0:
925        tests_passed_ratio = (num_pass + num_not_mandated_fail) / tot_tests_run
926      else:
927        tests_passed_ratio = (num_pass + num_not_mandated_fail) / 100.0
928      tests_passed_ratio_format = f'{(100 * tests_passed_ratio):.1f}%'
929      if num_skip > 0:
930        skip_string = f",{num_skip} test{'s' if num_skip > 1 else ''} skipped"
931      test_result = (f'{num_pass + num_not_mandated_fail} / {tot_tests_run} '
932                     f'tests passed ({tests_passed_ratio_format}){skip_string}')
933      logging.info(test_result)
934      if num_not_mandated_fail > 0:
935        logging.info('(*) %s not_yet_mandated tests failed',
936                     num_not_mandated_fail)
937
938      tot_pass += num_pass
939      logging.info('scene tests: %s, Total tests passed: %s', tot_tests,
940                   tot_pass)
941      if tot_tests > 0:
942        logging.info('%s compatibility score: %.f/100\n',
943                     s, 100 * num_pass / tot_tests)
944        scene_test_summary_path = os.path.join(mobly_scene_output_logs_path,
945                                               'scene_test_summary.txt')
946        with open(scene_test_summary_path, 'w') as f:
947          f.write(scene_test_summary)
948        results[s][RESULT_KEY] = (RESULT_PASS if num_fail == 0 else RESULT_FAIL)
949        results[s][SUMMARY_KEY] = scene_test_summary_path
950        results[s][TIME_KEY_START] = scene_start_time
951        results[s][TIME_KEY_END] = scene_end_time
952      else:
953        logging.info('%s compatibility score: 0/100\n')
954
955      # Delete temporary yml file after scene run.
956      new_yaml_file_path = os.path.join(YAML_FILE_DIR, new_yml_file_name)
957      os.remove(new_yaml_file_path)
958
959    # Log results per camera
960    if num_testbeds is None or testbed_index == _MAIN_TESTBED:
961      logging.info('Reporting camera %s ITS results to CtsVerifier', camera_id)
962      report_result(device_id, camera_id, results)
963    else:
964      write_result(testbed_index, device_id, camera_id, results)
965
966  logging.info('Test execution completed.')
967
968  # Power down tablet
969  if tablet_id:
970    cmd = f'adb -s {tablet_id} shell input keyevent KEYCODE_POWER'
971    subprocess.Popen(cmd.split())
972
973  if num_testbeds is not None:
974    if testbed_index == _MAIN_TESTBED:
975      logging.info('Waiting for all testbeds to finish.')
976      start = time.time()
977      completed_testbeds = set()
978      while time.time() < start + MERGE_RESULTS_TIMEOUT:
979        for i in range(num_testbeds):
980          if os.path.isfile(f'testbed_{i}_completed.tmp'):
981            start = time.time()
982            completed_testbeds.add(i)
983        # Already reported _MAIN_TESTBED's results.
984        if len(completed_testbeds) == num_testbeds - 1:
985          logging.info('All testbeds completed, merging results.')
986          for parsed_id, parsed_camera, parsed_results in (
987              parse_testbeds(completed_testbeds)):
988            logging.debug('Parsed id: %s, parsed cam: %s, parsed results: %s',
989                          parsed_id, parsed_camera, parsed_results)
990            if not are_devices_similar(device_id, parsed_id):
991              logging.error('Device %s and device %s are not the same '
992                            'model/type/build/revision.',
993                            device_id, parsed_id)
994              return
995            report_result(device_id, parsed_camera, parsed_results)
996          for temp_file in glob.glob('testbed_*.tmp'):
997            os.remove(temp_file)
998          break
999      else:
1000        logging.error('No testbeds finished in the last %d seconds, '
1001                      'but still expected data. '
1002                      'Completed testbed indices: %s, '
1003                      'expected number of testbeds: %d',
1004                      MERGE_RESULTS_TIMEOUT, list(completed_testbeds),
1005                      num_testbeds)
1006    else:
1007      with open(f'testbed_{testbed_index}_completed.tmp', 'w') as _:
1008        pass
1009
1010if __name__ == '__main__':
1011  main()
1012