• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Atest Tradefed test runner class."""
16
17# pylint: disable=too-many-lines
18
19from __future__ import annotations
20from __future__ import print_function
21
22from abc import ABC, abstractmethod
23import dataclasses
24import enum
25from functools import partial
26import json
27import logging
28import os
29from pathlib import Path
30import re
31import select
32import shutil
33import socket
34import threading
35import time
36from typing import Any, Dict, List, Set, Tuple
37
38from atest import atest_configs
39from atest import atest_error
40from atest import atest_utils
41from atest import constants
42from atest import module_info
43from atest import result_reporter
44from atest import rollout_control
45from atest.atest_enum import DetectType, ExitCode
46from atest.coverage import coverage
47from atest.logstorage import logstorage_utils
48from atest.metrics import metrics
49from atest.test_finders import test_finder_utils
50from atest.test_finders import test_info
51from atest.test_finders.test_info import TestInfo
52from atest.test_runner_invocation import TestRunnerInvocation
53from atest.test_runners import test_runner_base as trb
54from atest.test_runners.event_handler import EventHandler
55
56POLL_FREQ_SECS = 10
57SOCKET_HOST = '127.0.0.1'
58SOCKET_QUEUE_MAX = 1
59SOCKET_BUFFER = 4096
60SELECT_TIMEOUT = 0.5
61
62# Env key for rolling subprocess output window height.
63_ROLLING_OUTPUT_WINDOW_HEIGHT_ENV_KEY = 'ATEST_ROLLING_OUTPUT_WINDOW_HEIGHT'
64
65# Socket Events of form FIRST_EVENT {JSON_DATA}\nSECOND_EVENT {JSON_DATA}
66# EVENT_RE has groups for the name and the data. "." does not match \n.
67EVENT_RE = re.compile(
68    r'\n*(?P<event_name>[A-Z_]+) (?P<json_data>{.*})(?=\n|.)*'
69)
70
71# Remove aapt from build dependency, use prebuilt version instead.
72EXEC_DEPENDENCIES = ('adb', 'fastboot')
73
74LOG_FOLDER_NAME = 'log'
75
76_INTEGRATION_FINDERS = frozenset(['', 'INTEGRATION', 'INTEGRATION_FILE_PATH'])
77
78# AAPT binary name
79_AAPT = 'aapt'
80
81# The exist code mapping of tradefed.
82_TF_EXIT_CODE = [
83    'NO_ERROR',
84    'CONFIG_EXCEPTION',
85    'NO_BUILD',
86    'DEVICE_UNRESPONSIVE',
87    'DEVICE_UNAVAILABLE',
88    'FATAL_HOST_ERROR',
89    'THROWABLE_EXCEPTION',
90    'NO_DEVICE_ALLOCATED',
91    'WRONG_JAVA_VERSION',
92]
93
94# The environment variable for TF preparer incremental setup.
95_INCREMENTAL_SETUP_KEY = 'TF_PREPARER_INCREMENTAL_SETUP'
96
97
98class Error(Exception):
99  """Module-level error."""
100
101
102class TradeFedExitError(Error):
103  """Raised when TradeFed exists before test run has finished."""
104
105  def __init__(self, exit_code):
106    super().__init__()
107    self.exit_code = exit_code
108
109  def __str__(self):
110    tf_error_reason = self._get_exit_reason(self.exit_code)
111    return (
112        'TradeFed subprocess exited early with exit code='
113        f'{self.exit_code}({tf_error_reason}).'
114    )
115
116  def _get_exit_reason(self, exit_code):
117    if 0 < exit_code < len(_TF_EXIT_CODE):
118      return atest_utils.mark_red(_TF_EXIT_CODE[exit_code])
119    return 'Unknown exit status'
120
121
122class AtestTradefedTestRunner(trb.TestRunnerBase):
123  """TradeFed Test Runner class."""
124
125  NAME = 'AtestTradefedTestRunner'
126  EXECUTABLE = 'atest_tradefed.sh'
127  # Common base template used by all TF tests
128  _TF_LOCAL_MIN = 'template/atest_local_min'
129  # Base template used by the device tests (tests requires device to run)
130  _TF_DEVICE_TEST_TEMPLATE = 'template/atest_device_test_base'
131  # Base template used by the deviceless tests
132  _TF_DEVICELESS_TEST_TEMPLATE = 'template/atest_deviceless_test_base'
133  # Use --no-enable-granular-attempts to control reporter replay behavior.
134  # TODO(b/142630648): Enable option enable-granular-attempts
135  # in sharding mode.
136  _LOG_ARGS = (
137      '--{log_root_option_name}={log_path} '
138      '{log_ext_option} '
139      '--no-enable-granular-attempts'
140  )
141  _RUN_CMD = (
142      '{env} {exe} {template} '
143      '--template:map test=atest '
144      '--template:map log_saver={log_saver} '
145      '{tf_customize_template} {log_args} {args}'
146  )
147  _BUILD_REQ = {'tradefed-core'}
148  _RERUN_OPTION_GROUP = [
149      constants.ITERATIONS,
150      constants.RERUN_UNTIL_FAILURE,
151      constants.RETRY_ANY_FAILURE,
152  ]
153
154  # We're using a class attribute because we're recreating runner instances
155  # for different purposes throughout an invocation.
156  # TODO(b/283352341): Remove this once we refactor to have runner instances.
157  _MINIMAL_BUILD_TARGETS = set()
158
159  def __init__(
160      self,
161      results_dir: str,
162      extra_args: Dict[str, Any],
163      mod_info: module_info.ModuleInfo = None,
164      minimal_build: bool = False,
165      **kwargs,
166  ):
167    """Init stuff for base class."""
168    super().__init__(results_dir, **kwargs)
169    self.module_info = mod_info
170    self.log_path = os.path.join(results_dir, LOG_FOLDER_NAME)
171    # (b/275537997) results_dir could be '' in test_runner_handler; only
172    # mkdir when it is invoked by run_tests.
173    if results_dir:
174      Path(self.log_path).mkdir(parents=True, exist_ok=True)
175    self.log_args = {
176        'log_root_option_name': constants.LOG_ROOT_OPTION_NAME,
177        'log_ext_option': constants.LOG_SAVER_EXT_OPTION,
178        'log_path': self.log_path,
179        'proto_path': os.path.join(
180            self.results_dir, constants.ATEST_TEST_RECORD_PROTO
181        ),
182    }
183    self.run_cmd_dict = {
184        'env': '',
185        'exe': self.EXECUTABLE,
186        'template': self._TF_LOCAL_MIN,
187        'log_saver': constants.ATEST_TF_LOG_SAVER,
188        'tf_customize_template': '',
189        'args': '',
190        'log_args': self._LOG_ARGS.format(**self.log_args),
191    }
192    self.root_dir = os.environ.get(constants.ANDROID_BUILD_TOP)
193    self._is_host_enabled = extra_args.get(constants.HOST, False)
194    self._minimal_build = minimal_build
195    logging.debug('Enable minimal build: %s' % self._minimal_build)
196    metrics.LocalDetectEvent(
197        detect_type=DetectType.IS_MINIMAL_BUILD, result=int(self._minimal_build)
198    )
199    self._smart_test_selection = extra_args.get(
200        constants.SMART_TEST_SELECTION, False
201    )
202
203  def requires_device_update(
204      self, test_infos: List[test_info.TestInfo]
205  ) -> bool:
206    """Checks whether this runner requires device update."""
207
208    requires_device_update = False
209    for info in test_infos:
210      test = self._create_test(info)
211      requires_device_update |= test.requires_device_update()
212
213    return requires_device_update
214
215  # @typing.override
216  def create_invocations(
217      self,
218      extra_args: Dict[str, Any],
219      test_infos: List[test_info.TestInfo],
220  ) -> List[TestRunnerInvocation]:
221    """Create separate test runner invocations for device and deviceless tests.
222
223    Args:
224        extra_args: Dict of extra args to pass to the invocations
225        test_infos: A list of TestInfos.
226
227    Returns:
228        A list of TestRunnerInvocation instances.
229    """
230    invocations = []
231    device_test_info_lists, deviceless_test_info_lists = self._partition_tests(
232        test_infos
233    )
234    if deviceless_test_info_lists:
235      extra_args_for_deviceless_test = extra_args.copy()
236      extra_args_for_deviceless_test.update({constants.HOST: True})
237      for temp_test_infos in deviceless_test_info_lists:
238        invocations.append(
239            TestRunnerInvocation(
240                test_runner=self,
241                extra_args=extra_args_for_deviceless_test,
242                test_infos=temp_test_infos,
243            )
244        )
245    if device_test_info_lists:
246      extra_args_for_device_test = extra_args.copy()
247      if rollout_control.tf_preparer_incremental_setup.is_enabled():
248        extra_args_for_device_test.update({_INCREMENTAL_SETUP_KEY: True})
249      for temp_test_infos in device_test_info_lists:
250        invocations.append(
251            TestRunnerInvocation(
252                test_runner=self,
253                extra_args=extra_args_for_device_test,
254                test_infos=temp_test_infos,
255            )
256        )
257
258    return invocations
259
260  def _partition_tests(
261      self,
262      test_infos: List[test_info.TestInfo],
263  ) -> (List[List[test_info.TestInfo]], List[List[test_info.TestInfo]]):
264    """Partition input tests into two lists based on whether it requires device.
265
266    Args:
267        test_infos: A list of TestInfos.
268
269    Returns:
270        Two lists one contains device test info lists the other contains
271        deviceless test info lists.
272    """
273    device_test_infos = []
274    deviceless_test_infos = []
275
276    for info in test_infos:
277      test = self._create_test(info)
278      if test.requires_device():
279        device_test_infos.append(info)
280      else:
281        deviceless_test_infos.append(info)
282
283    return [
284        [info] for info in device_test_infos
285    ] if self._smart_test_selection or not device_test_infos else [
286        device_test_infos
287    ], [
288        [info] for info in deviceless_test_infos
289    ] if self._smart_test_selection or not deviceless_test_infos else [
290        deviceless_test_infos
291    ]
292
293  def _try_set_gts_authentication_key(self):
294    """Set GTS authentication key if it is available or exists.
295
296    Strategy:
297        Get APE_API_KEY from os.environ:
298            - If APE_API_KEY is already set by user -> do nothing.
299        Get the APE_API_KEY from constants:
300            - If the key file exists -> set to env var.
301        If APE_API_KEY isn't set and the key file doesn't exist:
302            - Warn user some GTS tests may fail without authentication.
303    """
304    if os.environ.get('APE_API_KEY'):
305      logging.debug('APE_API_KEY is set by developer.')
306      return
307    ape_api_key = constants.GTS_GOOGLE_SERVICE_ACCOUNT
308    key_path = os.path.join(self.root_dir, ape_api_key)
309    if ape_api_key and os.path.exists(key_path):
310      logging.debug('Set APE_API_KEY: %s', ape_api_key)
311      os.environ['APE_API_KEY'] = key_path
312    else:
313      logging.debug(
314          'APE_API_KEY not set, some GTS tests may fail without authentication.'
315      )
316
317  def run_tests(self, test_infos, extra_args, reporter):
318    """Run the list of test_infos. See base class for more.
319
320    Args:
321        test_infos: A list of TestInfos.
322        extra_args: Dict of extra args to add to test run.
323        reporter: An instance of result_report.ResultReporter.
324
325    Returns:
326        0 if tests succeed, non-zero otherwise.
327    """
328    logging.debug('TF test runner running tests %s', test_infos)
329    reporter.log_path = self.log_path
330    reporter.rerun_options = self._extract_rerun_options(extra_args)
331    # Set google service key if it's available or found before
332    # running tests.
333    self._try_set_gts_authentication_key()
334    result = 0
335    upload_start = time.time()
336    invocation_properties = {'atest_run_id': metrics.get_run_id()}
337
338    # Set crystalball_ingest property if there are performance tests.
339    is_perf_tests = False
340    for info in test_infos:
341      if 'performance-tests' in info.compatibility_suites:
342        is_perf_tests = True
343        break
344    if is_perf_tests:
345      invocation_properties['crystalball_ingest'] = 'yes'
346
347    creds, inv = (
348        logstorage_utils.do_upload_flow(extra_args, invocation_properties)
349        if logstorage_utils.is_upload_enabled(extra_args)
350        else (None, None)
351    )
352    metrics.LocalDetectEvent(
353        detect_type=DetectType.UPLOAD_FLOW_MS,
354        result=int((time.time() - upload_start) * 1000),
355    )
356    try:
357      verify_key = atest_utils.get_verify_key(
358          [test_infos[0].test_name], extra_args
359      )
360      # Change CWD to repo root to ensure TF can find prebuilt SDKs
361      # for some path-sensitive tests like robolectric.
362      os.chdir(os.path.abspath(os.getenv(constants.ANDROID_BUILD_TOP)))
363
364      # Copy symbols if there are tests belong to native test.
365      self._handle_native_tests(test_infos)
366
367      if os.getenv(trb.OLD_OUTPUT_ENV_VAR):
368        result = self.run_tests_raw(test_infos, extra_args, reporter)
369      else:
370        result = self.run_tests_pretty(test_infos, extra_args, reporter)
371    except atest_error.DryRunVerificationError as e:
372      atest_utils.colorful_print(str(e), constants.RED)
373      return ExitCode.VERIFY_FAILURE
374    finally:
375      if inv:
376        try:
377          logging.disable(logging.INFO)
378          # Always set invocation status to completed due to the ATest
379          # handle whole process by its own.
380          inv['schedulerState'] = 'completed'
381          logstorage_utils.BuildClient(creds).update_invocation(inv)
382          reporter.test_result_link = (
383              constants.RESULT_LINK % inv['invocationId']
384          )
385          # TODO(b/400764778): Modify the logic of result_reporter to contain a
386          # mapping of invocations to test result links, and report the mapping
387          # in `ResultReporter.print_summary`.
388          print(
389              'Test Result uploaded to %s'
390              % atest_utils.mark_green(reporter.test_result_link)
391          )
392        finally:
393          logging.disable(logging.NOTSET)
394    return result
395
396  def run_tests_raw(self, test_infos, extra_args, reporter):
397    """Run the list of test_infos. See base class for more.
398
399    Args:
400        test_infos: A list of TestInfos.
401        extra_args: Dict of extra args to add to test run.
402        reporter: An instance of result_report.ResultReporter.
403
404    Returns:
405        0 if tests succeed, non-zero otherwise.
406    """
407    reporter.register_unsupported_runner(self.NAME)
408
409    ret_code = ExitCode.SUCCESS
410    run_cmds = self.generate_run_commands(test_infos, extra_args)
411    logging.debug('Running test: %s', run_cmds[0])
412    subproc = self.run(
413        run_cmds[0],
414        output_to_stdout=True,
415        env_vars=self.generate_env_vars(extra_args),
416    )
417    ret_code |= self.wait_for_subprocess(subproc)
418    return ret_code
419
420  def run_tests_pretty(self, test_infos, extra_args, reporter):
421    """Run the list of test_infos. See base class for more.
422
423    Args:
424        test_infos: A list of TestInfos.
425        extra_args: Dict of extra args to add to test run.
426        reporter: An instance of result_report.ResultReporter.
427
428    Returns:
429        0 if tests succeed, non-zero otherwise.
430    """
431    ret_code = ExitCode.SUCCESS
432    server = self._start_socket_server()
433    run_cmds = self.generate_run_commands(
434        test_infos, extra_args, server.getsockname()[1]
435    )
436    is_rolling_output = (
437        not extra_args.get(constants.VERBOSE, False)
438        and atest_utils.is_atty_terminal()
439        and rollout_control.rolling_tf_subprocess_output.is_enabled()
440    )
441
442    logging.debug('Running test: %s', run_cmds[0])
443    subproc = self.run(
444        run_cmds[0],
445        output_to_stdout=extra_args.get(constants.VERBOSE, False),
446        env_vars=self.generate_env_vars(extra_args),
447        rolling_output_lines=is_rolling_output,
448    )
449
450    if is_rolling_output:
451      height = os.environ.get(_ROLLING_OUTPUT_WINDOW_HEIGHT_ENV_KEY, None)
452      if height:
453        try:
454          height = int(height)
455        except ValueError:
456          atest_utils.print_and_log_warning(
457              'Invalid rolling output window height: %s', height
458          )
459      threading.Thread(
460          target=atest_utils.stream_io_output,
461          args=(
462              subproc.stdout,
463              height if height else atest_utils.DEFAULT_OUTPUT_ROLLING_LINES,
464          ),
465      ).start()
466
467    self.handle_subprocess(
468        subproc,
469        partial(self._start_monitor, server, subproc, reporter, extra_args),
470    )
471    server.close()
472    ret_code |= self.wait_for_subprocess(subproc)
473    return ret_code
474
475  # pylint: disable=too-many-branches
476  # pylint: disable=too-many-locals
477  def _start_monitor(self, server, tf_subproc, reporter, extra_args):
478    """Polling and process event.
479
480    Args:
481        server: Socket server object.
482        tf_subproc: The tradefed subprocess to poll.
483        reporter: Result_Reporter object.
484        extra_args: Dict of extra args to add to test run.
485    """
486    inputs = [server]
487    event_handlers = {}
488    data_map = {}
489    inv_socket = None
490    while inputs:
491      try:
492        readable, _, _ = select.select(inputs, [], [], SELECT_TIMEOUT)
493        for socket_object in readable:
494          if socket_object is server:
495            conn, addr = socket_object.accept()
496            logging.debug('Accepted connection from %s', addr)
497            conn.setblocking(False)
498            inputs.append(conn)
499            data_map[conn] = ''
500            # The First connection should be invocation
501            # level reporter.
502            if not inv_socket:
503              inv_socket = conn
504          else:
505            # Count invocation level reporter events
506            # without showing real-time information.
507            if inv_socket == socket_object:
508              reporter.silent = True
509              event_handler = event_handlers.setdefault(
510                  socket_object, EventHandler(reporter, self.NAME)
511              )
512            else:
513              event_handler = event_handlers.setdefault(
514                  socket_object,
515                  EventHandler(
516                      result_reporter.ResultReporter(
517                          collect_only=extra_args.get(
518                              constants.COLLECT_TESTS_ONLY
519                          ),
520                      ),
521                      self.NAME,
522                  ),
523              )
524            recv_data = self._process_connection(
525                data_map, socket_object, event_handler
526            )
527            if not recv_data:
528              inputs.remove(socket_object)
529              socket_object.close()
530      finally:
531        # Subprocess ended and all socket clients were closed.
532        if tf_subproc.poll() is not None and len(inputs) == 1:
533          inputs.pop().close()
534          if not reporter.all_test_results:
535            if atest_configs.GLOBAL_ARGS.user_type:
536              atest_utils.colorful_print(
537                  "The test module doesn't support "
538                  f"'{atest_configs.GLOBAL_ARGS.user_type}' "
539                  'user type, please check test config.',
540                  constants.RED,
541              )
542            atest_utils.colorful_print(
543                r'No test results available. TradeFed did not find'
544                r' any test cases to run. This is possibly due to'
545                r' the no tests matching the current test filters'
546                r' or misconfigured AndroidTest.xml. Test Logs'
547                r' are saved in '
548                f'{reporter.log_path}.',
549                constants.RED,
550                constants.WHITE,
551            )
552          if not data_map:
553            metrics.LocalDetectEvent(
554                detect_type=DetectType.TF_EXIT_CODE,
555                result=tf_subproc.returncode,
556            )
557            raise TradeFedExitError(tf_subproc.returncode)
558          self._handle_log_associations(event_handlers)
559
560  def _process_connection(self, data_map, conn, event_handler):
561    """Process a socket connection between TF and ATest.
562
563    Expect data of form EVENT_NAME {JSON_DATA}.  Multiple events will be
564    \n deliminated.  Need to buffer data in case data exceeds socket
565    buffer.
566    E.q.
567        TEST_RUN_STARTED {runName":"hello_world_test","runAttempt":0}\n
568        TEST_STARTED {"start_time":2172917, "testName":"PrintHelloWorld"}\n
569    Args:
570        data_map: The data map of all connections.
571        conn: Socket connection.
572        event_handler: EventHandler object.
573
574    Returns:
575        True if conn.recv() has data , False otherwise.
576    """
577    # Set connection into blocking mode.
578    conn.settimeout(None)
579    data = conn.recv(SOCKET_BUFFER)
580    if isinstance(data, bytes):
581      data = data.decode()
582    logging.debug('received: %s', data)
583    if data:
584      data_map[conn] += data
585      while True:
586        match = EVENT_RE.match(data_map[conn])
587        if not match:
588          break
589        try:
590          event_data = json.loads(match.group('json_data'))
591        except ValueError:
592          logging.debug('Json incomplete, wait for more data')
593          break
594        event_name = match.group('event_name')
595        event_handler.process_event(event_name, event_data)
596        data_map[conn] = data_map[conn][match.end() :]
597    return bool(data)
598
599  def _start_socket_server(self):
600    """Start a TCP server."""
601    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
602    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
603    # Port 0 lets the OS pick an open port between 1024 and 65535.
604    server.bind((SOCKET_HOST, 0))
605    server.listen(SOCKET_QUEUE_MAX)
606    server.settimeout(POLL_FREQ_SECS)
607    logging.debug('Socket server started on port %s', server.getsockname()[1])
608    return server
609
610  def generate_env_vars(self, extra_args):
611    """Convert extra args and test_infos into env vars.
612
613    Args:
614        extra_args: Dict of extra args to add to test run.
615        test_infos: A list of TestInfos.
616
617    Returns:
618        A dict modified from os.getenv.copy().
619    """
620    env_vars = os.environ.copy()
621    if constants.TF_GLOBAL_CONFIG and is_log_upload_enabled(extra_args):
622      env_vars['TF_GLOBAL_CONFIG'] = constants.TF_GLOBAL_CONFIG
623    debug_port = extra_args.get(constants.TF_DEBUG, '')
624    if debug_port:
625      env_vars['TF_DEBUG'] = 'true'
626      env_vars['TF_DEBUG_PORT'] = str(debug_port)
627
628    filtered_paths = []
629    for path in str(env_vars.get('PYTHONPATH', '')).split(':'):
630      # TODO (b/166216843) Remove the hacky PYTHON path workaround.
631      if (
632          str(path).startswith('/tmp/Soong.python_')
633          and str(path).find('googleapiclient') > 0
634      ):
635        continue
636      filtered_paths.append(path)
637    if filtered_paths:
638      env_vars['PYTHONPATH'] = ':'.join(filtered_paths)
639
640    # Use prebuilt aapt if there's no aapt under android system path which
641    # is aligned with build system.
642    # https://android.googlesource.com/platform/build/+/master/core/config.mk#529
643    if self._is_missing_exec(_AAPT):
644      prebuilt_aapt = Path.joinpath(
645          atest_utils.get_prebuilt_sdk_tools_dir(), _AAPT
646      )
647      if os.path.exists(prebuilt_aapt):
648        env_vars['PATH'] = str(prebuilt_aapt.parent) + ':' + env_vars['PATH']
649
650    # Add an env variable for the classpath that only contains the host jars
651    # required for the tests we'll be running.
652    if self._minimal_build:
653      self._generate_host_jars_env_var(env_vars)
654
655    return env_vars
656
657  def _generate_host_jars_env_var(self, env_vars):
658    def is_host_jar(p):
659      return p.suffix == '.jar' and p.is_relative_to(
660          Path(os.getenv(constants.ANDROID_HOST_OUT))
661      )
662
663    all_host_jars = []
664
665    for target in AtestTradefedTestRunner._MINIMAL_BUILD_TARGETS:
666      if target.variant != Variant.HOST:
667        continue
668      # Only use the first host jar because the same jar may be installed
669      # to multiple places.
670      module_host_jars = [
671          p
672          for p in self.module_info.get_installed_paths(target.module_name)
673          if is_host_jar(p)
674      ]
675      all_host_jars.extend(
676          [str(module_host_jars[0])] if module_host_jars else []
677      )
678
679    env_vars['ATEST_HOST_JARS'] = ':'.join(set(all_host_jars))
680    logging.debug(
681        'Set env ATEST_HOST_JARS: %s.', env_vars.get('ATEST_HOST_JARS')
682    )
683
684  # pylint: disable=unnecessary-pass
685  # Please keep above disable flag to ensure host_env_check is overridden.
686  def host_env_check(self):
687    """Check that host env has everything we need.
688
689    We actually can assume the host env is fine because we have the same
690    requirements that atest has. Update this to check for android env vars
691    if that changes.
692    """
693    pass
694
695  @staticmethod
696  def _is_missing_exec(executable):
697    """Check if system build executable is available.
698
699    Args:
700        executable: Executable we are checking for.
701
702    Returns:
703        True if executable is missing, False otherwise.
704    """
705    output = shutil.which(executable)
706    if not output:
707      return True
708    # TODO: Check if there is a clever way to determine if system adb is
709    # good enough.
710    root_dir = os.environ.get(constants.ANDROID_BUILD_TOP, '')
711    return os.path.commonprefix([output, root_dir]) != root_dir
712
713  def _use_minimal_build(self, test_infos: List[test_info.TestInfo]) -> bool:
714
715    if not self._minimal_build:
716      return False
717
718    unsupported = set()
719    for t_info in test_infos:
720      if t_info.test_finder in [
721          'CONFIG',
722          'INTEGRATION',
723          'INTEGRATION_FILE_PATH',
724      ]:
725        unsupported.add(t_info.test_name)
726      # For ltp and kselftest, keep it as no-minimal-build.
727      elif t_info.test_name in (
728          constants.REQUIRED_LTP_TEST_MODULES
729          + constants.REQUIRED_KSELFTEST_TEST_MODULES
730      ):
731        unsupported.add(t_info.test_name)
732
733    if not unsupported:
734      return True
735
736    atest_utils.print_and_log_warning(
737        'Minimal build was disabled because the following tests do not support'
738        ' it: %s',
739        unsupported,
740    )
741    return False
742
743  def get_test_runner_build_reqs(
744      self, test_infos: List[test_info.TestInfo]
745  ) -> Set[str]:
746    """Return the build requirements.
747
748    Args:
749        test_infos: List of TestInfo.
750
751    Returns:
752        Set of build targets.
753    """
754    if self._use_minimal_build(test_infos):
755      return self._get_test_runner_reqs_minimal(test_infos)
756
757    return self._get_test_runner_build_reqs_maximal(test_infos)
758
759  def _get_test_runner_build_reqs_maximal(
760      self, test_infos: List[test_info.TestInfo]
761  ) -> Set[str]:
762    build_req = self._BUILD_REQ.copy()
763    # Use different base build requirements if google-tf is around.
764    if self.module_info.is_module(constants.GTF_MODULE):
765      build_req = {constants.GTF_TARGET}
766    # Always add ATest's own TF target.
767    build_req.add(constants.ATEST_TF_MODULE)
768    # Add adb if we can't find it.
769    for executable in EXEC_DEPENDENCIES:
770      if self._is_missing_exec(executable):
771        if self.module_info.is_module(executable):
772          build_req.add(executable)
773
774    # Force rebuilt all jars under $ANDROID_HOST_OUT to prevent old version
775    # host jars break the test.
776    build_req |= self._get_host_framework_targets()
777
778    build_req |= trb.gather_build_targets(test_infos)
779    return build_req
780
781  def _get_test_runner_reqs_minimal(
782      self, test_infos: List[test_info.TestInfo]
783  ) -> Set[str]:
784
785    build_targets = set()
786    runtime_targets = set()
787
788    for info in test_infos:
789      test = self._create_test(info)
790      build_targets.update(test.query_build_targets())
791      runtime_targets.update(test.query_runtime_targets())
792
793    AtestTradefedTestRunner._MINIMAL_BUILD_TARGETS = runtime_targets
794
795    build_targets = {t.name() for t in build_targets}
796
797    return build_targets
798
799  def _create_test(self, t_info: test_info.TestInfo) -> Test:
800
801    info = self.module_info.get_module_info(t_info.raw_test_name)
802
803    if not info:
804      # In cases module info does not exist (e.g. TF integration tests), use the
805      # TestInfo to determine the test type. In the future we should ensure all
806      # tests have their corresponding module info and only rely on the module
807      # info to determine the test type.
808      atest_utils.print_and_log_warning(
809          'Could not find module information for %s', t_info.raw_test_name
810      )
811      return self._guess_test_type_for_missing_module(t_info)
812
813    def _select_variant(info):
814      variants = self.module_info.build_variants(info)
815      if len(variants) < 2:
816        return Variant.HOST if variants[0] == 'HOST' else Variant.DEVICE
817      return Variant.HOST if self._is_host_enabled else Variant.DEVICE
818
819    if not self._is_host_enabled and self.module_info.requires_device(info):
820      return DeviceTest(info, _select_variant(info), t_info.mainline_modules)
821
822    return DevicelessTest(info, _select_variant(info))
823
824  def _guess_test_type_for_missing_module(
825      self, t_info: test_info.TestInfo
826  ) -> Test:
827    """Determine the test type (device or deviceless) without module info."""
828    if (
829        not self._is_host_enabled
830        and t_info.get_supported_exec_mode() != constants.DEVICELESS_TEST
831    ):
832      return DeviceTest(None, Variant.DEVICE, t_info.mainline_modules)
833
834    return DevicelessTest(None, Variant.HOST)
835
836  def _get_host_framework_targets(self) -> Set[str]:
837    """Get the build targets for all the existing jars under host framework.
838
839    Returns:
840        A set of build target name under $(ANDROID_HOST_OUT)/framework.
841    """
842    host_targets = set()
843    if not self.module_info:
844      return host_targets
845
846    framework_host_dir = Path(
847        os.environ.get(constants.ANDROID_HOST_OUT)
848    ).joinpath('framework')
849    if framework_host_dir.is_dir():
850      jars = framework_host_dir.glob('*.jar')
851      for jar in jars:
852        if self.module_info.is_module(jar.stem):
853          host_targets.add(jar.stem)
854      logging.debug('Found exist host framework target:%s', host_targets)
855    return host_targets
856
857  def _parse_extra_args(self, test_infos, extra_args):
858    """Convert the extra args into something tf can understand.
859
860    Args:
861        extra_args: Dict of args
862
863    Returns:
864        Tuple of args to append and args not supported.
865    """
866    args_to_append, args_not_supported = extra_args_to_tf_args(
867        extra_args, self.module_info
868    )
869
870    # Set exclude instant app annotation for non-instant mode run.
871    if constants.INSTANT not in extra_args and self._has_instant_app_config(
872        test_infos, self.module_info
873    ):
874      args_to_append.append(constants.TF_TEST_ARG)
875      args_to_append.append(
876          '{tf_class}:{option_name}:{option_value}'.format(
877              tf_class=constants.TF_AND_JUNIT_CLASS,
878              option_name=constants.TF_EXCLUDE_ANNOTATE,
879              option_value=constants.INSTANT_MODE_ANNOTATE,
880          )
881      )
882    # Force append --enable-parameterized-modules if args_to_append has
883    # --module-parameter in args_to_append
884    if constants.TF_MODULE_PARAMETER in args_to_append:
885      if constants.TF_ENABLE_PARAMETERIZED_MODULES not in args_to_append:
886        args_to_append.append(constants.TF_ENABLE_PARAMETERIZED_MODULES)
887    # If all the test config has config with auto enable parameter, force
888    # exclude those default parameters(ex: instant_app, secondary_user)
889    # TODO: (b/228433541) Remove the limitation after the root cause fixed.
890    if len(test_infos) <= 1 and self._is_all_tests_parameter_auto_enabled(
891        test_infos
892    ):
893      if constants.TF_ENABLE_PARAMETERIZED_MODULES not in args_to_append:
894        args_to_append.append(constants.TF_ENABLE_PARAMETERIZED_MODULES)
895        for exclude_parameter in constants.DEFAULT_EXCLUDE_PARAS:
896          args_to_append.append('--exclude-module-parameters')
897          args_to_append.append(exclude_parameter)
898    return args_to_append, args_not_supported
899
900  def generate_run_commands(self, test_infos, extra_args, port=None):
901    """Generate a single run command from TestInfos.
902
903    Args:
904        test_infos: A list of TestInfo instances.
905        extra_args: A Dict of extra args to append.
906        port: Optional. An int of the port number to send events to. If None,
907          then subprocess reporter in TF won't try to connect.
908
909    Returns:
910        A list that contains the string of atest tradefed run command.
911        Only one command is returned.
912    """
913    if any(
914        'performance-tests' in info.compatibility_suites for info in test_infos
915    ):
916      self.run_cmd_dict['template'] = 'template/performance-tests-base'
917    elif extra_args.get(constants.USE_TF_MIN_BASE_TEMPLATE):
918      self.run_cmd_dict['template'] = self._TF_LOCAL_MIN
919    else:
920      self.run_cmd_dict['template'] = (
921          self._TF_DEVICELESS_TEST_TEMPLATE
922          if extra_args.get(constants.HOST)
923          else self._TF_DEVICE_TEST_TEMPLATE
924      )
925
926    args = self._create_test_args(test_infos, extra_args)
927
928    # Create a copy of args as more args could be added to the list.
929    test_args = list(args)
930    if port:
931      test_args.extend(['--subprocess-report-port', str(port)])
932    if extra_args.get(constants.INVOCATION_ID, None):
933      test_args.append(
934          '--invocation-data invocation_id=%s'
935          % extra_args[constants.INVOCATION_ID]
936      )
937    if extra_args.get(constants.WORKUNIT_ID, None):
938      test_args.append(
939          '--invocation-data work_unit_id=%s'
940          % extra_args[constants.WORKUNIT_ID]
941      )
942    if extra_args.get(constants.LOCAL_BUILD_ID, None):
943      # TODO: (b/207584685) Replace with TF local build solutions.
944      test_args.append('--use-stub-build true')
945      test_args.append(
946          '--stub-build-id %s' % extra_args[constants.LOCAL_BUILD_ID]
947      )
948      test_args.append(
949          '--stub-build-target %s' % extra_args[constants.BUILD_TARGET]
950      )
951    for info in test_infos:
952      if atest_utils.get_test_and_mainline_modules(info.test_name):
953        # TODO(b/253641058) Remove this once mainline module
954        # binaries are stored under testcase directory.
955        if not extra_args.get(constants.DRY_RUN):
956          self._copy_mainline_module_binary(info.mainline_modules)
957        test_args.append(constants.TF_ENABLE_MAINLINE_PARAMETERIZED_MODULES)
958        break
959    # For detailed logs, set TF options log-level/log-level-display as
960    # 'VERBOSE' by default.
961    log_level = 'VERBOSE'
962    test_args.extend(['--log-level-display', log_level])
963    test_args.extend(['--log-level', log_level])
964
965    # Set no-early-device-release by default to speed up TF teardown time.
966    # TODO(b/300882567) remove this forever when it's the default behavor.
967    test_args.extend(['--no-early-device-release'])
968
969    args_to_add, args_not_supported = self._parse_extra_args(
970        test_infos, extra_args
971    )
972
973    # If multiple devices in test config, automatically append
974    # --replicate-parent-setup and --multi-device-count
975    device_count = atest_configs.GLOBAL_ARGS.device_count_config
976    if device_count and device_count > 1:
977      args_to_add.append('--replicate-parent-setup')
978      args_to_add.append('--multi-device-count')
979      args_to_add.append(str(device_count))
980      os.environ.pop(constants.ANDROID_SERIAL, None)
981    else:
982      # TODO(b/122889707) Remove this after finding the root cause.
983      env_serial = os.environ.get(constants.ANDROID_SERIAL)
984      # Use the env variable ANDROID_SERIAL if it's set by user but only
985      # when the target tests are not deviceless tests.
986      if (
987          env_serial
988          and '--serial' not in args_to_add
989          and '-n' not in args_to_add
990      ):
991        args_to_add.append('--serial')
992        args_to_add.append(env_serial)
993
994    test_args.extend(args_to_add)
995    if args_not_supported:
996      atest_utils.print_and_log_info(
997          '%s does not support the following args %s',
998          self.EXECUTABLE,
999          args_not_supported,
1000      )
1001
1002    # Only need to check one TestInfo to determine if the tests are
1003    # configured in TEST_MAPPING.
1004    for_test_mapping = test_infos and test_infos[0].from_test_mapping
1005    if is_log_upload_enabled(extra_args):
1006      test_args.extend(atest_utils.get_result_server_args(for_test_mapping))
1007    self.run_cmd_dict['args'] = ' '.join(test_args)
1008    self.run_cmd_dict['tf_customize_template'] = (
1009        self._extract_customize_tf_templates(extra_args)
1010    )
1011
1012    # By default using ATestFileSystemLogSaver no matter what running under
1013    # aosp or internal branches. Only switch using google log saver if user
1014    # tend to upload test result to AnTS which could be detected by the
1015    # invocation_id in extra args.
1016    if is_log_upload_enabled(extra_args):
1017      self.use_google_log_saver()
1018
1019    run_commands = [self._RUN_CMD.format(**self.run_cmd_dict)]
1020    logging.debug('TF test runner generated run commands %s', run_commands)
1021    return run_commands
1022
1023  def _flatten_test_infos(self, test_infos):
1024    """Sort and group test_infos by module_name and sort and group filters
1025
1026    by class name.
1027
1028        Example of three test_infos in a set:
1029            Module1, {(classA, {})}
1030            Module1, {(classB, {Method1})}
1031            Module1, {(classB, {Method2}}
1032        Becomes a set with one element:
1033            Module1, {(ClassA, {}), (ClassB, {Method1, Method2})}
1034        Where:
1035              Each line is a test_info namedtuple
1036              {} = Frozenset
1037              () = TestFilter namedtuple
1038
1039    Args:
1040        test_infos: A list of TestInfo namedtuples.
1041
1042    Returns:
1043        A list of TestInfos flattened.
1044    """
1045    results = []
1046    for module, group in atest_utils.sort_and_group(
1047        test_infos, lambda x: x.test_name
1048    ):
1049
1050      # module is a string, group is a generator of grouped TestInfos.
1051      # Module Test, so flatten test_infos:
1052      no_filters = False
1053      filters = set()
1054      test_runner = None
1055      test_finder = None
1056      build_targets = set()
1057      data = {}
1058      module_args = []
1059      for test_info_i in group:
1060        data.update(test_info_i.data)
1061        # Extend data with constants.TI_MODULE_ARG instead of
1062        # overwriting.
1063        module_args.extend(test_info_i.data.get(constants.TI_MODULE_ARG, []))
1064        test_runner = test_info_i.test_runner
1065        test_finder = test_info_i.test_finder
1066        build_targets |= test_info_i.build_targets
1067        test_filters = test_info_i.data.get(constants.TI_FILTER)
1068        if not test_filters or no_filters:
1069          # test_info wants whole module run, so hardcode no filters.
1070          no_filters = True
1071          filters = set()
1072          continue
1073        filters |= test_filters
1074      if module_args:
1075        data[constants.TI_MODULE_ARG] = module_args
1076      data[constants.TI_FILTER] = self.flatten_test_filters(filters)
1077      results.append(
1078          test_info.TestInfo(
1079              test_name=module,
1080              test_runner=test_runner,
1081              test_finder=test_finder,
1082              build_targets=build_targets,
1083              data=data,
1084          )
1085      )
1086    return results
1087
1088  @staticmethod
1089  def flatten_test_filters(filters):
1090    """Sort and group test_filters by class_name.
1091
1092        Example of three test_filters in a frozenset:
1093            classA, {}
1094            classB, {Method1}
1095            classB, {Method2}
1096        Becomes a frozenset with these elements:
1097            classA, {}
1098            classB, {Method1, Method2}
1099        Where:
1100            Each line is a TestFilter namedtuple
1101            {} = Frozenset
1102
1103    Args:
1104        filters: A frozenset of test_filters.
1105
1106    Returns:
1107        A frozenset of test_filters flattened.
1108    """
1109    results = set()
1110    for class_name, group in atest_utils.sort_and_group(
1111        filters, lambda x: x.class_name
1112    ):
1113
1114      # class_name is a string, group is a generator of TestFilters
1115      assert class_name is not None
1116      methods = set()
1117      for test_filter in group:
1118        if not test_filter.methods:
1119          # Whole class should be run
1120          methods = set()
1121          break
1122        methods |= test_filter.methods
1123      results.add(test_info.TestFilter(class_name, frozenset(methods)))
1124    return frozenset(results)
1125
1126  def _is_all_tests_parameter_auto_enabled(self, test_infos):
1127    """Check if all the test infos are parameter auto enabled.
1128
1129    Args:
1130        test_infos: A set of TestInfo instances.
1131
1132    Returns: True if all tests are parameter auto enabled, False otherwise.
1133    """
1134    for info in test_infos:
1135      if not self._is_parameter_auto_enabled_cfg(info, self.module_info):
1136        return False
1137    return True
1138
1139  def _create_test_args(
1140      self, test_infos: list[TestInfo], extra_args: Dict[str, Any]
1141  ) -> list[str]:
1142    """Compile TF command line args based on the given test infos.
1143
1144    Args:
1145        test_infos: A list of TestInfo instances.
1146        extra_args: A Dict of extra args for test runners to utilize.
1147
1148    Returns: A list of TF arguments to run the tests.
1149    """
1150    args = []
1151    if not test_infos:
1152      return []
1153
1154    if atest_configs.GLOBAL_ARGS.group_test:
1155      test_infos = self._flatten_test_infos(test_infos)
1156
1157    has_integration_test = False
1158
1159    # Because current --include-filter arg will not working if ATest pass
1160    # both --module and --include-filter to TF, only test by --module will
1161    # be run. Make a check first, only use --module if all tests are all
1162    # parameter auto enabled.
1163    # Only auto-enable the parameter if there's only one test.
1164    # TODO: (b/228433541) Remove the limitation after the root cause fixed.
1165    use_module_arg = False
1166    if len(test_infos) <= 1:
1167      use_module_arg = self._is_all_tests_parameter_auto_enabled(test_infos)
1168
1169    for info in test_infos:
1170      # Integration test exists in TF's jar, so it must have the option
1171      # if it's integration finder.
1172      if info.test_finder in _INTEGRATION_FINDERS:
1173        has_integration_test = True
1174      # For non-parameterize test module, use --include-filter, but for
1175      # tests which have auto enable parameterize config use --module
1176      # instead.
1177      if use_module_arg and self._is_parameter_auto_enabled_cfg(
1178          info, self.module_info
1179      ):
1180        args.extend([constants.TF_MODULE_FILTER, info.test_name])
1181      else:
1182        args.extend([constants.TF_INCLUDE_FILTER, info.test_name])
1183      for option in info.data.get(constants.TI_MODULE_ARG, []):
1184        if constants.TF_INCLUDE_FILTER_OPTION == option[0]:
1185          suite_filter = constants.TF_SUITE_FILTER_ARG_VALUE_FMT.format(
1186              test_name=info.test_name, option_value=option[1]
1187          )
1188          args.extend([constants.TF_INCLUDE_FILTER, suite_filter])
1189        elif constants.TF_EXCLUDE_FILTER_OPTION == option[0]:
1190          suite_filter = constants.TF_SUITE_FILTER_ARG_VALUE_FMT.format(
1191              test_name=info.test_name, option_value=option[1]
1192          )
1193          args.extend([constants.TF_EXCLUDE_FILTER, suite_filter])
1194        else:
1195          module_arg = constants.TF_MODULE_ARG_VALUE_FMT.format(
1196              test_name=info.test_name,
1197              option_name=option[0],
1198              option_value=option[1],
1199          )
1200          args.extend([constants.TF_MODULE_ARG, module_arg])
1201
1202    # Add ATest include filter
1203    args.extend(
1204        get_include_filter(
1205            test_infos, extra_args.get(constants.TEST_FILTER, None)
1206        )
1207    )
1208
1209    # TODO (b/141090547) Pass the config path to TF to load configs.
1210    # Compile option in TF if finder is not INTEGRATION or not set.
1211    if not has_integration_test:
1212      args.append(constants.TF_SKIP_LOADING_CONFIG_JAR)
1213    return args
1214
1215  def _extract_rerun_options(self, extra_args):
1216    """Extract rerun options to a string for output.
1217
1218    Args:
1219        extra_args: Dict of extra args for test runners to use.
1220
1221    Returns: A string of rerun options.
1222    """
1223    extracted_options = [
1224        '{} {}'.format(arg, extra_args[arg])
1225        for arg in extra_args
1226        if arg in self._RERUN_OPTION_GROUP
1227    ]
1228    return ' '.join(extracted_options)
1229
1230  def _extract_customize_tf_templates(self, extra_args: dict[str]) -> str:
1231    """Extract tradefed template options to a string for output.
1232
1233    Args:
1234        extra_args: Dict of extra args for test runners to use.
1235
1236    Returns:
1237        A string of tradefed template options.
1238    """
1239    tf_templates = extra_args.get(constants.TF_TEMPLATE, [])
1240    return ' '.join(['--template:map %s' % x for x in tf_templates])
1241
1242  def _handle_log_associations(self, event_handlers):
1243    """Handle TF's log associations information data.
1244
1245    log_association dict:
1246    {'loggedFile': '/tmp/serial-util11375755456514097276.ser',
1247     'dataName': 'device_logcat_setup_127.0.0.1:58331',
1248     'time': 1602038599.856113},
1249
1250    Args:
1251        event_handlers: Dict of {socket_object:EventHandler}.
1252    """
1253    log_associations = []
1254    for _, event_handler in event_handlers.items():
1255      if event_handler.log_associations:
1256        log_associations += event_handler.log_associations
1257    device_test_end_log_time = ''
1258    device_teardown_log_time = ''
1259    for log_association in log_associations:
1260      if 'device_logcat_test' in log_association.get('dataName', ''):
1261        device_test_end_log_time = log_association.get('time')
1262      if 'device_logcat_teardown' in log_association.get('dataName', ''):
1263        device_teardown_log_time = log_association.get('time')
1264    if device_test_end_log_time and device_teardown_log_time:
1265      teardowntime = float(device_teardown_log_time) - float(
1266          device_test_end_log_time
1267      )
1268      logging.debug('TF logcat teardown time=%s seconds.', teardowntime)
1269      metrics.LocalDetectEvent(
1270          detect_type=DetectType.TF_TEARDOWN_LOGCAT, result=int(teardowntime)
1271      )
1272
1273  @staticmethod
1274  def _has_instant_app_config(test_infos, mod_info):
1275    """Check if one of the input tests defined instant app mode in config.
1276
1277    Args:
1278        test_infos: A set of TestInfo instances.
1279        mod_info: ModuleInfo object.
1280
1281    Returns: True if one of the tests set up instant app mode.
1282    """
1283    for tinfo in test_infos:
1284      test_config, _ = test_finder_utils.get_test_config_and_srcs(
1285          tinfo, mod_info
1286      )
1287      if test_config:
1288        parameters = atest_utils.get_config_parameter(test_config)
1289        if constants.TF_PARA_INSTANT_APP in parameters:
1290          return True
1291    return False
1292
1293  @staticmethod
1294  def _is_parameter_auto_enabled_cfg(tinfo, mod_info):
1295    """Check if input tests contains auto enable support parameters.
1296
1297    Args:
1298        test_infos: A set of TestInfo instances.
1299        mod_info: ModuleInfo object.
1300
1301    Returns: True if input test has parameter setting which is not in the
1302             exclude list.
1303    """
1304    test_config, _ = test_finder_utils.get_test_config_and_srcs(tinfo, mod_info)
1305    if test_config:
1306      parameters = atest_utils.get_config_parameter(test_config)
1307      if (
1308          parameters
1309          - constants.DEFAULT_EXCLUDE_PARAS
1310          - constants.DEFAULT_EXCLUDE_NOT_PARAS
1311      ):
1312        return True
1313    return False
1314
1315  def _handle_native_tests(self, test_infos):
1316    """Handling some extra tasks for running native tests from tradefed.
1317
1318    Args:
1319        test_infos: A set of TestInfo instances.
1320    """
1321    for tinfo in test_infos:
1322      test_config, _ = test_finder_utils.get_test_config_and_srcs(
1323          tinfo, self.module_info
1324      )
1325      if test_config:
1326        module_name, device_path = atest_utils.get_config_gtest_args(
1327            test_config
1328        )
1329        if module_name and device_path:
1330          atest_utils.copy_native_symbols(module_name, device_path)
1331
1332  # TODO(b/253641058) remove copying files once mainline module
1333  # binaries are stored under testcase directory.
1334  def _copy_mainline_module_binary(self, mainline_modules):
1335    """Copies mainline module binaries to out/dist/mainline_modules_{arch}
1336
1337    Copies the mainline module binaries to the location that
1338    MainlineModuleHandler in TF expects since there is no way to
1339    explicitly tweak the search path.
1340
1341    Args:
1342        mainline_modules: A list of mainline modules.
1343    """
1344    config = atest_utils.get_android_config()
1345    arch = config.get('TARGET_ARCH')
1346    dest_dir = atest_utils.DIST_OUT_DIR.joinpath(f'mainline_modules_{arch}')
1347    dest_dir.mkdir(parents=True, exist_ok=True)
1348
1349    for module in mainline_modules:
1350      target_module_info = self.module_info.get_module_info(module)
1351      installed_paths = target_module_info[constants.MODULE_INSTALLED]
1352
1353      for installed_path in installed_paths:
1354        file_name = Path(installed_path).name
1355        dest_path = Path(dest_dir).joinpath(file_name)
1356        if dest_path.exists():
1357          atest_utils.colorful_print(
1358              'Replacing APEX in %s with %s' % (dest_path, installed_path),
1359              constants.CYAN,
1360          )
1361          logging.debug(
1362              'deleting the old file: %s and copy a new binary', dest_path
1363          )
1364          dest_path.unlink()
1365        shutil.copyfile(installed_path, dest_path)
1366
1367        break
1368
1369  def use_google_log_saver(self):
1370    """Replace the original log saver to google log saver."""
1371    self.log_args.update({
1372        'log_root_option_name': constants.GOOGLE_LOG_SAVER_LOG_ROOT_OPTION_NAME,
1373        'log_ext_option': constants.GOOGLE_LOG_SAVER_EXT_OPTION,
1374    })
1375    self.run_cmd_dict.update({
1376        'log_saver': constants.GOOGLE_LOG_SAVER,
1377        'log_args': self._LOG_ARGS.format(**self.log_args),
1378    })
1379
1380
1381def is_log_upload_enabled(extra_args: Dict[str, Any]) -> bool:
1382  """Check if input extra_args include google log saver related args.
1383
1384  Args:
1385      extra_args: Dict of args.
1386  """
1387  return bool(extra_args.get(constants.INVOCATION_ID, None))
1388
1389
1390def generate_annotation_filter_args(
1391    arg_value: Any,
1392    mod_info: module_info.ModuleInfo,
1393    test_infos: List[test_info.TestInfo],
1394) -> List[str]:
1395  """Generate TF annotation filter arguments.
1396
1397  Args:
1398      arg_value: Argument value for annotation filter.
1399      mod_info: ModuleInfo object.
1400      test_infos: A set of TestInfo instances.
1401
1402  Returns:
1403      List of TF annotation filter arguments.
1404  """
1405  annotation_filter_args = []
1406  for info in test_infos:
1407    test_name = info.test_name
1408    for keyword in arg_value:
1409      annotation = atest_utils.get_full_annotation_class_name(
1410          mod_info.get_module_info(test_name), keyword
1411      )
1412      if annotation:
1413        module_arg = constants.TF_MODULE_ARG_VALUE_FMT.format(
1414            test_name=test_name,
1415            option_name=constants.INCLUDE_ANNOTATION,
1416            option_value=annotation,
1417        )
1418        annotation_filter_args.extend([constants.TF_MODULE_ARG, module_arg])
1419      atest_utils.print_and_log_error(
1420          atest_utils.mark_red(f'Cannot find similar annotation: {keyword}')
1421      )
1422  return annotation_filter_args
1423
1424
1425def extra_args_to_tf_args(
1426    extra_args: Dict[str, Any], mod_info: module_info.ModuleInfo = None
1427) -> Tuple[Dict[str, Any], Dict[str, Any]]:
1428  """Convert the extra args into atest_tf_test_runner supported args.
1429
1430  Args:
1431      extra_args: Dict of args
1432      mod_info: ModuleInfo object.
1433
1434  Returns:
1435      Tuple of ARGS that atest_tf supported and not supported.
1436  """
1437  supported_args = []
1438  unsupported_args = []
1439
1440  def constant_list(*value):
1441    return lambda *_: value
1442
1443  # pylint: disable=unused-argument
1444  def print_message(message):
1445    def inner(*args):
1446      print(message)
1447      return []
1448
1449    return inner
1450
1451  # Mapping supported TF arguments to the processing function.
1452  supported_tf_args = dict({
1453      constants.WAIT_FOR_DEBUGGER: constant_list('--wait-for-debugger'),
1454      constants.DISABLE_INSTALL: constant_list('--disable-target-preparers'),
1455      constants.SERIAL: lambda arg_value: [
1456          j for d in arg_value for j in ('--serial', d)
1457      ],
1458      constants.SHARDING: lambda arg_value: ['--shard-count', str(arg_value)],
1459      constants.DISABLE_TEARDOWN: constant_list('--disable-teardown'),
1460      constants.HOST: constant_list(
1461          '-n', '--prioritize-host-config', '--skip-host-arch-check'
1462      ),
1463      constants.CUSTOM_ARGS:
1464      # custom args value is a list.
1465      lambda arg_value: arg_value,
1466      constants.ALL_ABI: constant_list('--all-abi'),
1467      constants.INSTANT: constant_list(
1468          constants.TF_ENABLE_PARAMETERIZED_MODULES,
1469          constants.TF_MODULE_PARAMETER,
1470          'instant_app',
1471      ),
1472      constants.USER_TYPE: lambda arg_value: [
1473          constants.TF_ENABLE_PARAMETERIZED_MODULES,
1474          '--enable-optional-parameterization',
1475          constants.TF_MODULE_PARAMETER,
1476          str(arg_value),
1477      ],
1478      constants.ITERATIONS: lambda arg_value: [
1479          '--retry-strategy',
1480          constants.ITERATIONS,
1481          '--max-testcase-run-count',
1482          str(arg_value),
1483      ],
1484      constants.RERUN_UNTIL_FAILURE: lambda arg_value: [
1485          '--retry-strategy',
1486          constants.RERUN_UNTIL_FAILURE,
1487          '--max-testcase-run-count',
1488          str(arg_value),
1489      ],
1490      constants.RETRY_ANY_FAILURE: lambda arg_value: [
1491          '--retry-strategy',
1492          constants.RETRY_ANY_FAILURE,
1493          '--max-testcase-run-count',
1494          str(arg_value),
1495      ],
1496      constants.COLLECT_TESTS_ONLY: constant_list('--collect-tests-only'),
1497      constants.TF_DEBUG: print_message('Please attach process to your IDE...'),
1498      constants.ANNOTATION_FILTER: generate_annotation_filter_args,
1499      constants.TEST_FILTER: lambda arg_value: [
1500          '--test-arg',
1501          (
1502              'com.android.tradefed.testtype.AndroidJUnitTest:'
1503              f'include-filter:{arg_value}'
1504          ),
1505          '--test-arg',
1506          (
1507              'com.android.tradefed.testtype.GTest:native-test-flag:'
1508              f'--gtest_filter={arg_value}'
1509          ),
1510          '--test-arg',
1511          (
1512              'com.android.tradefed.testtype.HostGTest:native-test-flag:'
1513              f'--gtest_filter={arg_value}'
1514          ),
1515      ],
1516      constants.TEST_TIMEOUT: lambda arg_value: [
1517          '--test-arg',
1518          (
1519              'com.android.tradefed.testtype.AndroidJUnitTest:'
1520              f'shell-timeout:{arg_value}'
1521          ),
1522          '--test-arg',
1523          (
1524              'com.android.tradefed.testtype.AndroidJUnitTest:'
1525              f'test-timeout:{arg_value}'
1526          ),
1527          '--test-arg',
1528          (
1529              'com.android.tradefed.testtype.HostGTest:'
1530              f'native-test-timeout:{arg_value}'
1531          ),
1532          '--test-arg',
1533          (
1534              'com.android.tradefed.testtype.GTest:'
1535              f'native-test-timeout:{arg_value}'
1536          ),
1537          '--test-arg',
1538          (
1539              'com.android.compatibility.testtype.LibcoreTest:'
1540              f'test-timeout:{arg_value}'
1541          ),
1542      ],
1543      constants.COVERAGE: lambda _: coverage.tf_args(mod_info),
1544      _INCREMENTAL_SETUP_KEY: constant_list('--incremental-setup=YES'),
1545  })
1546
1547  for arg in extra_args:
1548    if arg in supported_tf_args:
1549      tf_args = supported_tf_args[arg](extra_args[arg])
1550      if tf_args:
1551        supported_args.extend(tf_args)
1552      continue
1553
1554    if arg in (
1555        constants.TF_TEMPLATE,
1556        constants.INVOCATION_ID,
1557        constants.WORKUNIT_ID,
1558        constants.REQUEST_UPLOAD_RESULT,
1559        constants.DISABLE_UPLOAD_RESULT,
1560        constants.LOCAL_BUILD_ID,
1561        constants.BUILD_TARGET,
1562        constants.DRY_RUN,
1563        constants.DEVICE_ONLY,
1564    ):
1565      continue
1566    unsupported_args.append(arg)
1567  return supported_args, unsupported_args
1568
1569
1570def get_include_filter(
1571    test_infos: List[test_info.TestInfo], test_filter_arg: str = None
1572) -> List[str]:
1573  """Generate a list of tradefed filter argument from TestInfos.
1574
1575  Args:
1576      test_infos: a List of TestInfo object.
1577      test_filter_arg: the value of the desired test filter passed by the user
1578        using the --test-filter flag.
1579
1580  The include filter pattern looks like:
1581      --atest-include-filter <module-name>:<include-filter-value>
1582
1583  Returns:
1584      List of Tradefed command args.
1585  """
1586  tf_args = []
1587  for info in test_infos:
1588    # If a --test-filter is specified by the user, use the test filter in addition to the
1589    # fully qualified module:test#method name for each test.
1590    if test_filter_arg:
1591      formatted_test_filter_arg = (
1592          constants.TF_ATEST_INCLUDE_FILTER_VALUE_FMT.format(
1593              test_name=info.test_name, test_filter=test_filter_arg
1594          )
1595      )
1596      tf_args.extend(
1597          [constants.TF_ATEST_INCLUDE_FILTER, formatted_test_filter_arg]
1598      )
1599    filters = []
1600    for test_info_filter in info.data.get(constants.TI_FILTER, []):
1601      filters.extend(test_info_filter.to_list_of_tf_strings())
1602    for test_filter in filters:
1603      filter_arg = constants.TF_ATEST_INCLUDE_FILTER_VALUE_FMT.format(
1604          test_name=info.test_name, test_filter=test_filter
1605      )
1606      tf_args.extend([constants.TF_ATEST_INCLUDE_FILTER, filter_arg])
1607  return tf_args
1608
1609
1610@enum.unique
1611class Variant(enum.Enum):
1612  """The variant of a build module."""
1613
1614  NONE = ''
1615  HOST = 'host'
1616  DEVICE = 'target'
1617
1618  def __init__(self, suffix):
1619    self._suffix = suffix
1620
1621  @property
1622  def suffix(self) -> str:
1623    """The suffix without the 'dash' used to qualify build targets."""
1624    return self._suffix
1625
1626
1627@dataclasses.dataclass(frozen=True)
1628class Target:
1629  """A build target."""
1630
1631  module_name: str
1632  variant: Variant
1633
1634  def name(self) -> str:
1635    """The name to use on the command-line to build this target."""
1636    if not self.variant.suffix:
1637      return self.module_name
1638    return f'{self.module_name}-{self.variant.suffix}'
1639
1640
1641class Test(ABC):
1642  """A test that can be run."""
1643
1644  _DEFAULT_HARNESS_TARGETS = frozenset(
1645      [
1646          Target('atest-tradefed', Variant.HOST),
1647          Target('atest_script_help.sh', Variant.HOST),
1648          Target('atest_tradefed.sh', Variant.HOST),
1649          Target('tradefed', Variant.HOST),
1650      ]
1651      + [Target(t, Variant.HOST) for t in constants.GTF_TARGETS]
1652  )
1653
1654  def query_build_targets(self) -> Set[Target]:
1655    """Returns the list of build targets required to run this test."""
1656    build_targets = set()
1657    build_targets.update(self._get_harness_build_targets())
1658    build_targets.update(self._get_test_build_targets())
1659    return build_targets
1660
1661  @abstractmethod
1662  def query_runtime_targets(self) -> Set[Target]:
1663    """Returns the list of targets required during runtime."""
1664
1665  @abstractmethod
1666  def _get_test_build_targets(self) -> Set[Target]:
1667    """Returns the list of build targets of test and its dependencies."""
1668
1669  @abstractmethod
1670  def _get_harness_build_targets(self) -> Set[Target]:
1671    """Returns the list of build targets of test harness and its dependencies."""
1672
1673  @abstractmethod
1674  def requires_device(self) -> bool:
1675    """Returns true if the test requires a device, otherwise false."""
1676
1677  @abstractmethod
1678  def requires_device_update(self) -> bool:
1679    """Checks whether the test requires device update."""
1680
1681
1682class DeviceTest(Test):
1683  """A device test that can be run."""
1684
1685  def __init__(
1686      self, info: Dict[str, Any], variant: Variant, mainline_modules: Set[str]
1687  ):
1688
1689    self._info = info
1690    self._variant = variant
1691    self._mainline_modules = mainline_modules
1692
1693  def query_runtime_targets(self) -> Set[Target]:
1694    return self.query_build_targets() | _get_host_required_deps(self._info)
1695
1696  def requires_device(self):
1697    return True
1698
1699  def requires_device_update(self):
1700    # The test doesn't need device update as long as it's a unit test,
1701    # no matter if it's running on device or host.
1702    # Some tests (e.g. TF integration tests) do not have module info, and we
1703    # can't determine whether they require device update or not. So that we
1704    # treat them as they require device update to avoid disabling the device
1705    # update mistakenly.
1706    return not self._info or not module_info.ModuleInfo.is_unit_test(self._info)
1707
1708  def _get_test_build_targets(self) -> Set[Target]:
1709    module_name = self._info[constants.MODULE_INFO_ID]
1710    build_targets = set([Target(module_name, self._variant)])
1711    build_targets.update(_get_libs_deps(self._info, self._variant))
1712    build_targets.update(
1713        Target(m, Variant.NONE) for m in self._mainline_modules
1714    )
1715    return build_targets
1716
1717  def _get_harness_build_targets(self):
1718    build_targets = set(Test._DEFAULT_HARNESS_TARGETS)
1719    build_targets.update(
1720        set([
1721            Target('adb', Variant.HOST),
1722            Target('aapt', Variant.HOST),
1723            Target('aapt2', Variant.HOST),
1724            Target('compatibility-host-util', Variant.HOST),
1725        ])
1726    )
1727
1728    # Auto-generated Java tests use a module template that uses the Dalvik
1729    # test runner and requires the implementation jars. See
1730    # https://source.corp.google.com/android-internal/build/make/core/java_test_config_template.xml.
1731    # These dependencies should ideally be automatically added by the build
1732    # rule since Atest can fall out of sync otherwise.
1733    # TODO(b/284987354): Remove these targets once the build rule adds the
1734    # required deps.
1735    if _is_dalvik_test_module(self._info):
1736      build_targets.add(Target('cts-dalvik-host-test-runner', Variant.HOST))
1737      build_targets.add(Target('cts-dalvik-device-test-runner', Variant.DEVICE))
1738
1739    if 'vts' in self._info.get(constants.MODULE_COMPATIBILITY_SUITES, []):
1740      # Note that we do not include `compatibility-tradefed` which is
1741      # already included in the VTS harness.
1742      build_targets.add(Target('vts-core-tradefed-harness', Variant.HOST))
1743    else:
1744      build_targets.add(Target('compatibility-tradefed', Variant.HOST))
1745
1746    return build_targets
1747
1748
1749class DevicelessTest(Test):
1750
1751  def __init__(self, info: Dict[str, Any], variant: Variant):
1752    self._info = info
1753    self._variant = variant
1754
1755  def _get_test_build_targets(self) -> Set[Target]:
1756    module_name = self._info[constants.MODULE_INFO_ID]
1757    return set([Target(module_name, self._variant)])
1758
1759  def _get_harness_build_targets(self):
1760    build_targets = set(Test._DEFAULT_HARNESS_TARGETS)
1761    build_targets.update(
1762        set([
1763            # TODO(b/277116853): Remove the adb dependency for deviceless tests.
1764            Target('adb', Variant.HOST),
1765        ])
1766    )
1767    return build_targets
1768
1769  def query_runtime_targets(self) -> Set[Target]:
1770    return self.query_build_targets()
1771
1772  def requires_device(self):
1773    return False
1774
1775  def requires_device_update(self):
1776    return False
1777
1778
1779def _get_libs_deps(info: Dict[str, Any], variant: Variant) -> Set[Target]:
1780
1781  # We only need the runtime dependencies with host variant since TradeFed
1782  # won't push any runtime dependencies to the test device and the runtime
1783  # dependencies with device variant should already exist on the test device.
1784  if variant != Variant.HOST:
1785    return set()
1786
1787  deps = set()
1788  deps.update([Target(m, variant) for m in info.get(constants.MODULE_LIBS, [])])
1789
1790  return deps
1791
1792
1793def _get_host_required_deps(info: Dict[str, Any]) -> Set[Target]:
1794
1795  deps = set()
1796  deps.update(
1797      Target(m, Variant.HOST) for m in info.get(constants.MODULE_HOST_DEPS, [])
1798  )
1799
1800  return deps
1801
1802
1803def _is_dalvik_test_module(info: Dict[str, Any]) -> bool:
1804  return 'JAVA_LIBRARIES' in info.get(
1805      constants.MODULE_CLASS, []
1806  ) and True in info.get(constants.MODULE_AUTO_TEST_CONFIG, [])
1807