• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Copyright (C) 2017 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import cmd
18import ctypes
19import datetime
20import imp  # Python v2 compatibility
21import logging
22import multiprocessing
23import multiprocessing.pool
24import os
25import re
26import shutil
27import signal
28import socket
29import sys
30import tempfile
31import threading
32import time
33import urlparse
34
35from host_controller import common
36from host_controller.command_processor import command_adb
37from host_controller.command_processor import command_build
38from host_controller.command_processor import command_config
39from host_controller.command_processor import command_config_local
40from host_controller.command_processor import command_copy
41from host_controller.command_processor import command_device
42from host_controller.command_processor import command_dut
43from host_controller.command_processor import command_exit
44from host_controller.command_processor import command_fastboot
45from host_controller.command_processor import command_fetch
46from host_controller.command_processor import command_flash
47from host_controller.command_processor import command_gsispl
48from host_controller.command_processor import command_info
49from host_controller.command_processor import command_lease
50from host_controller.command_processor import command_list
51from host_controller.command_processor import command_password
52from host_controller.command_processor import command_release
53from host_controller.command_processor import command_retry
54from host_controller.command_processor import command_request
55from host_controller.command_processor import command_repack
56from host_controller.command_processor import command_sheet
57from host_controller.command_processor import command_shell
58from host_controller.command_processor import command_sleep
59from host_controller.command_processor import command_test
60from host_controller.command_processor import command_reproduce
61from host_controller.command_processor import command_upload
62from host_controller.build import build_info
63from host_controller.build import build_provider_ab
64from host_controller.build import build_provider_gcs
65from host_controller.build import build_provider_local_fs
66from host_controller.build import build_provider_pab
67from host_controller.utils.ipc import file_lock
68from host_controller.utils.ipc import shared_dict
69from host_controller.vti_interface import vti_endpoint_client
70from vts.runners.host import logger
71from vts.utils.python.common import cmd_utils
72
73COMMAND_PROCESSORS = [
74    command_adb.CommandAdb,
75    command_build.CommandBuild,
76    command_config.CommandConfig,
77    command_config_local.CommandConfigLocal,
78    command_copy.CommandCopy,
79    command_device.CommandDevice,
80    command_dut.CommandDUT,
81    command_exit.CommandExit,
82    command_fastboot.CommandFastboot,
83    command_fetch.CommandFetch,
84    command_flash.CommandFlash,
85    command_gsispl.CommandGsispl,
86    command_info.CommandInfo,
87    command_lease.CommandLease,
88    command_list.CommandList,
89    command_password.CommandPassword,
90    command_release.CommandRelease,
91    command_retry.CommandRetry,
92    command_request.CommandRequest,
93    command_repack.CommandRepack,
94    command_sheet.CommandSheet,
95    command_shell.CommandShell,
96    command_sleep.CommandSleep,
97    command_test.CommandTest,
98    command_reproduce.CommandReproduce,
99    command_upload.CommandUpload,
100]
101
102
103class NonDaemonizedProcess(multiprocessing.Process):
104    """Process class which is not daemonized."""
105
106    def _get_daemon(self):
107        return False
108
109    def _set_daemon(self, value):
110        pass
111
112    daemon = property(_get_daemon, _set_daemon)
113
114
115class NonDaemonizedPool(multiprocessing.pool.Pool):
116    """Pool class which is not daemonized."""
117
118    Process = NonDaemonizedProcess
119
120
121def JobMain(vti_address, in_queue, out_queue, device_status, password, hosts):
122    """Main() for a child process that executes a leased job.
123
124    Currently, lease jobs must use VTI (not TFC).
125
126    Args:
127        vti_client: VtiEndpointClient needed to create Console.
128        in_queue: Queue to get new jobs.
129        out_queue: Queue to put execution results.
130        device_status: SharedDict, contains device status information.
131                       shared between processes.
132        password: multiprocessing.managers.ValueProxy, a proxy instance of a
133                  string(ctypes.c_char_p) represents the password which is
134                  to be passed to the prompt when executing certain command
135                  as root user.
136        hosts: A list of HostController objects. Needed for the device command.
137    """
138
139    def SigTermHandler(signum, frame):
140        """Signal handler for exiting pool process explicitly.
141
142        Added to resolve orphaned pool process issue.
143        """
144        sys.exit(0)
145
146    signal.signal(signal.SIGTERM, SigTermHandler)
147
148    vti_client = vti_endpoint_client.VtiEndpointClient(vti_address)
149    console = Console(vti_client, None, None, hosts, job_pool=True)
150    console.device_status = device_status
151    console.password = password
152    multiprocessing.util.Finalize(console, console.__exit__, exitpriority=0)
153
154    while True:
155        command = in_queue.get()
156        if command == "exit":
157            break
158        elif command == "lease":
159            filepath, kwargs = vti_client.LeaseJob(socket.gethostname(), True)
160            logging.debug("Job %s -> %s" % (os.getpid(), kwargs))
161            if filepath is not None:
162                # TODO: redirect console output and add
163                # console command to access them.
164
165                console._build_provider[
166                    "pab"] = build_provider_pab.BuildProviderPAB()
167                console._build_provider[
168                    "gcs"] = build_provider_gcs.BuildProviderGCS()
169
170                for serial in kwargs["serial"]:
171                    console.ChangeDeviceState(
172                        serial, common._DEVICE_STATUS_DICT["use"])
173                print_to_console = True
174                if not print_to_console:
175                    sys.stdout = out
176                    sys.stderr = err
177
178                ret, gcs_log_url = console.ProcessConfigurableScript(
179                    os.path.join(os.getcwd(), "host_controller", "campaigns",
180                                 filepath), **kwargs)
181                if ret:
182                    job_status = "complete"
183                else:
184                    job_status = "infra-err"
185
186                vti_client.StopHeartbeat(job_status, gcs_log_url)
187                logging.info("Job execution complete. "
188                             "Setting job status to {}".format(job_status))
189
190                if not print_to_console:
191                    sys.stdout = sys.__stdout__
192                    sys.stderr = sys.__stderr__
193
194                for serial in kwargs["serial"]:
195                    console.ChangeDeviceState(
196                        serial, common._DEVICE_STATUS_DICT["ready"])
197
198                del console._build_provider["pab"]
199                del console._build_provider["gcs"]
200                console.fetch_info = {}
201                console._detailed_fetch_info = {}
202        else:
203            logging.error("Unknown job command %s", command)
204
205    out_queue.put("exit")
206
207
208class Console(cmd.Cmd):
209    """The console for host controllers.
210
211    Attributes:
212        command_processors: dict of string:BaseCommandProcessor,
213                            map between command string and command processors.
214        device_image_info: dict containing info about device image files.
215        prompt: The prompt string at the beginning of each command line.
216        test_result: dict containing info about the last test result.
217        test_suite_info: dict containing info about test suite package files.
218        tools_info: dict containing info about custom tool files.
219        scheduler_thread: dict containing threading.Thread instances(s) that
220                          update configs regularly.
221        _build_provider_pab: The BuildProviderPAB used to download artifacts.
222        _vti_address: string, VTI service URI.
223        _vti_client: VtiEndpoewrClient, used to upload data to a test
224                     scheduling infrastructure.
225        _tfc_client: The TfcClient that the host controllers connect to.
226        _hosts: A list of HostController objects.
227        _in_file: The input file object.
228        _out_file: The output file object.
229        _serials: A list of string where each string is a device serial.
230        _device_status: SharedDict, shared with process pool.
231                        contains status data on each devices.
232        _job_pool: bool, True if Console is created from job pool process
233                   context.
234        _password: multiprocessing.managers.ValueProxy, a proxy instance of a
235                   string(ctypes.c_char_p) represents the password which is
236                   to be passed to the prompt when executing certain command
237                   as root user.
238        _manager: SyncManager. an instance of a manager for shared objects and
239                  values between processes.
240        _vtslab_version: string, contains version information of vtslab package.
241                         (<git commit timestamp>:<git commit hash value>)
242        _detailed_fetch_info: A nested dict, holds the branch and target value
243                              of the device, gsi, or test suite artifact.
244        _file_lock: FileLock, an instance used for synchronizing the devices'
245                    use when the automated self-update happens.
246    """
247
248    def __init__(self,
249                 vti_endpoint_client,
250                 tfc,
251                 pab,
252                 host_controllers,
253                 vti_address=None,
254                 in_file=sys.stdin,
255                 out_file=sys.stdout,
256                 job_pool=False,
257                 password=None):
258        """Initializes the attributes and the parsers."""
259        # cmd.Cmd is old-style class.
260        cmd.Cmd.__init__(self, stdin=in_file, stdout=out_file)
261        self._build_provider = {}
262        self._job_pool = job_pool
263        if not self._job_pool:
264            self._build_provider["pab"] = pab
265            self._build_provider["gcs"] = build_provider_gcs.BuildProviderGCS()
266            self._build_provider[
267                "local_fs"] = build_provider_local_fs.BuildProviderLocalFS()
268            self._build_provider["ab"] = build_provider_ab.BuildProviderAB()
269            self._manager = multiprocessing.Manager()
270            self._device_status = shared_dict.SharedDict(self._manager)
271            self._password = self._manager.Value(ctypes.c_char_p, password)
272            try:
273                with open(common._VTSLAB_VERSION_TXT, "r") as file:
274                    self._vtslab_version = file.readline().strip()
275                    file.close()
276                    logging.info("VTSLAB version: %s" % self._vtslab_version)
277            except IOError as e:
278                logging.exception(e)
279                logging.error("Version info missing in vtslab package. "
280                              "Setting version as %s",
281                              common._VTSLAB_VERSION_DEFAULT_VALUE)
282                self._vtslab_version = common._VTSLAB_VERSION_DEFAULT_VALUE
283            self._logfile_upload_path = ""
284
285        self._vti_endpoint_client = vti_endpoint_client
286        self._vti_address = vti_address
287        self._tfc_client = tfc
288        self._hosts = host_controllers
289        self._in_file = in_file
290        self._out_file = out_file
291        self.prompt = "> "
292        self.command_processors = {}
293        self.device_image_info = build_info.BuildInfo()
294        self.test_result = {}
295        self.test_suite_info = build_info.BuildInfo()
296        self.tools_info = build_info.BuildInfo()
297        self.fetch_info = {}
298        self._detailed_fetch_info = {}
299        self.test_results = {}
300        self._file_lock = file_lock.FileLock()
301        self.repack_dest_path = ""
302
303        if common._ANDROID_SERIAL in os.environ:
304            self._serials = [os.environ[common._ANDROID_SERIAL]]
305        else:
306            self._serials = []
307
308        self.InitCommandModuleParsers()
309        self.SetUpCommandProcessors()
310
311        tempdir_base = os.path.join(os.getcwd(), "tmp")
312        if not os.path.exists(tempdir_base):
313            os.mkdir(tempdir_base)
314        self._tmpdir_default = tempfile.mkdtemp(dir=tempdir_base)
315        self._tmp_logdir = tempfile.mkdtemp(dir=tempdir_base)
316        if not self._job_pool:
317            self._logfile_path = logger.setupTestLogger(
318                self._tmp_logdir, create_symlink=False)
319
320    def __exit__(self):
321        """Finalizes the build provider attributes explicitly when exited."""
322        for bp in self._build_provider:
323            self._build_provider[bp].__del__()
324        if os.path.exists(self._tmp_logdir):
325            shutil.rmtree(self._tmp_logdir)
326
327    @property
328    def job_pool(self):
329        """getter for self._job_pool"""
330        return self._job_pool
331
332    @property
333    def device_status(self):
334        """getter for self._device_status"""
335        return self._device_status
336
337    @device_status.setter
338    def device_status(self, device_status):
339        """setter for self._device_status"""
340        self._device_status = device_status
341
342    @property
343    def build_provider(self):
344        """getter for self._build_provider"""
345        return self._build_provider
346
347    @property
348    def tmpdir_default(self):
349        """getter for self._password"""
350        return self._tmpdir_default
351
352    @tmpdir_default.setter
353    def tmpdir_default(self, tmpdir):
354        """getter for self._password"""
355        self._tmpdir_default = tmpdir
356
357    @property
358    def password(self):
359        """getter for self._password"""
360        return self._password
361
362    @password.setter
363    def password(self, password):
364        """getter for self._password"""
365        self._password = password
366
367    @property
368    def logfile_path(self):
369        """getter for self._logfile_path"""
370        return self._logfile_path
371
372    @property
373    def tmp_logdir(self):
374        """getter for self._tmp_logdir"""
375        return self._tmp_logdir
376
377    @property
378    def vti_endpoint_client(self):
379        """getter for self._vti_endpoint_client"""
380        return self._vti_endpoint_client
381
382    @property
383    def vtslab_version(self):
384        """getter for self._vtslab_version"""
385        return self._vtslab_version
386
387    @property
388    def detailed_fetch_info(self):
389        return self._detailed_fetch_info
390
391    def UpdateFetchInfo(self, artifact_type):
392        if artifact_type in common._ARTIFACT_TYPE_LIST:
393            self._detailed_fetch_info[artifact_type] = {}
394            self._detailed_fetch_info[artifact_type].update(self.fetch_info)
395        else:
396            logging.error("Unrecognized artifact type: %s", artifact_type)
397
398    @property
399    def file_lock(self):
400        """getter for self._file_lock"""
401        return self._file_lock
402
403    def ChangeDeviceState(self, serial, state):
404        """Changes a device's state and (un)locks the file lock if necessary.
405
406        Args:
407            serial: string, serial number of a device.
408            state: int, devices' status value pre-defined in
409                   common._DEVICE_STATUS_DICT.
410        Returns:
411            True if the state change and locking/unlocking are successful.
412            False otherwise.
413        """
414        if state == common._DEVICE_STATUS_DICT["use"]:
415            ret = self._file_lock.LockDevice(serial)
416            if ret == False:
417                return False
418
419        current_status = self.device_status[serial]
420        self.device_status[serial] = state
421
422        if (current_status in (common._DEVICE_STATUS_DICT["use"],
423                               common._DEVICE_STATUS_DICT["error"])
424                and current_status != state):
425            self._file_lock.UnlockDevice(serial)
426
427    def InitCommandModuleParsers(self):
428        """Init all console command modules"""
429        for name in dir(self):
430            if name.startswith('_Init') and name.endswith('Parser'):
431                attr_func = getattr(self, name)
432                if hasattr(attr_func, '__call__'):
433                    attr_func()
434
435    def SetUpCommandProcessors(self):
436        """Sets up all command processors."""
437        for command_processor in COMMAND_PROCESSORS:
438            cp = command_processor()
439            cp._SetUp(self)
440            do_text = "do_%s" % cp.command
441            help_text = "help_%s" % cp.command
442            setattr(self, do_text, cp._Run)
443            setattr(self, help_text, cp._Help)
444            self.command_processors[cp.command] = cp
445
446    def TearDown(self):
447        """Removes all command processors."""
448        for command_processor in self.command_processors.itervalues():
449            command_processor._TearDown()
450        self.command_processors.clear()
451        self.__exit__()
452
453    def FormatString(self, format_string):
454        """Replaces variables with the values in the console's dictionaries.
455
456        Args:
457            format_string: The string containing variables enclosed in {}.
458
459        Returns:
460            The formatted string.
461
462        Raises:
463            KeyError if a variable is not found in the dictionaries or the
464            value is empty.
465        """
466
467        def ReplaceVariable(match):
468            """Replacement functioon for re.sub().
469
470            replaces string encased in braces with values in the console's dict.
471
472            Args:
473                match: regex, used for extracting the variable name.
474
475            Returns:
476                string value corresponding to the input variable name.
477            """
478            name = match.group(1)
479            if name in ("build_id", "branch", "target", "account_id"):
480                value = self.fetch_info[name]
481            elif name in ("result_full", "result_zip", "suite_plan",
482                          "suite_name"):
483                value = self.test_result[name]
484            elif "timestamp" in name:
485                current_datetime = datetime.datetime.now()
486                value_date = current_datetime.strftime("%Y%m%d")
487                value_time = current_datetime.strftime("%H%M%S")
488                if "_date" in name:
489                    value = value_date
490                elif "_time" in name:
491                    value = value_time
492                elif "_year" in name:
493                    value = value_date[0:4]
494                elif "_month" in name:
495                    value = value_date[4:6]
496                elif "_day" in name:
497                    value = value_date[6:8]
498                else:
499                    value = "%s-%s" % (value_date, value_time)
500            elif name in ("hc_log", "hc_log_file", "hc_log_upload_path"):
501                # hc_log: full abs path to the current process's infra log.
502                # hc_log_file: infra log file name, with no path information.
503                # hc_log_upload_path: path of the infra log file in GCS.
504                value = self._logfile_path
505                if name == "hc_log_file":
506                    value = os.path.basename(value)
507                elif name == "hc_log_upload_path":
508                    value = self._logfile_upload_path
509            elif name in ("repack_path"):
510                value = self.repack_dest_path
511                self.repack_dest_path = ""
512            elif name in ("hostname"):
513                value = socket.gethostname()
514            elif "." in name and name.split(".")[0] in self.command_processors:
515                command, arg = name.split(".")
516                try:
517                    value = self.command_processors[command].arg_buffer[arg]
518                except KeyError as e:
519                    logging.exception(e)
520                    value = ""
521                if value is None:
522                    value = ""
523            else:
524                value = None
525
526            if value is None:
527                raise KeyError(name)
528
529            return value
530
531        return re.sub("{([^}]+)}", ReplaceVariable, format_string)
532
533    def ProcessScript(self, script_file_path):
534        """Processes a .py script file.
535
536        A script file implements a function which emits a list of console
537        commands to execute. That function emits an empty list or None if
538        no more command needs to be processed.
539
540        Args:
541            script_file_path: string, the path of a script file (.py file).
542
543        Returns:
544            True if successful; False otherwise
545        """
546        if not script_file_path.endswith(".py"):
547            logging.error("Script file is not .py file: %s" % script_file_path)
548            return False
549
550        script_module = imp.load_source('script_module', script_file_path)
551
552        commands = script_module.EmitConsoleCommands()
553        if commands:
554            for command in commands:
555                ret = self.onecmd(command)
556                if ret == False:
557                    return False
558        return True
559
560    def ProcessConfigurableScript(self, script_file_path, **kwargs):
561        """Processes a .py script file.
562
563        A script file implements a function which emits a list of console
564        commands to execute. That function emits an empty list or None if
565        no more command needs to be processed.
566
567        Args:
568            script_file_path: string, the path of a script file (.py file).
569            kwargs: extra args for the interface function defined in
570                    the script file.
571
572        Returns:
573            True if successful; False otherwise
574            String which represents URL to the upload infra log file.
575        """
576        if script_file_path and not script_file_path.endswith(".py"):
577            script_file_path += ".py"
578
579        if not script_file_path.endswith(".py"):
580            logging.error("Script file is not .py file: %s", script_file_path)
581            return False
582
583        ret = True
584
585        self._logfile_path, file_handler = logger.addLogFile(self._tmp_logdir)
586        src = self.FormatString("{hc_log}")
587        dest = self.FormatString(
588            "gs://vts-report/infra_log/{hostname}/%s_{timestamp}/{hc_log_file}"
589            % kwargs["build_target"])
590        self._logfile_upload_path = dest
591
592        script_module = imp.load_source('script_module', script_file_path)
593
594        commands = script_module.EmitConsoleCommands(**kwargs)
595        logging.info("Command list: %s", commands)
596        if commands:
597            logging.info("Console commands: %s", commands)
598            for command in commands:
599                ret = self.onecmd(command)
600                if ret == False:
601                    break
602        else:
603            ret = False
604
605        file_handler.flush()
606        infra_log_upload_command = "upload"
607        infra_log_upload_command += " --src=%s" % src
608        infra_log_upload_command += " --dest=%s" % dest
609        for serial in kwargs["serial"]:
610            if self.device_status[serial] == common._DEVICE_STATUS_DICT[
611                    "error"]:
612                self.vti_endpoint_client.SetJobStatusFromLeasedTo("bootup-err")
613                break
614        if not self.vti_endpoint_client.CheckBootUpStatus():
615            infra_log_upload_command += (" --report_path=gs://vts-report/"
616                                         "suite_result/{timestamp_year}/"
617                                         "{timestamp_month}/{timestamp_day}")
618            suite_name, plan_name = kwargs["test_name"].split("/")
619            infra_log_upload_command += (
620                " --result_from_suite=%s" % suite_name)
621            infra_log_upload_command += (" --result_from_plan=%s" % plan_name)
622        self.onecmd(infra_log_upload_command)
623        if self.GetSerials():
624            self.onecmd("device --update=stop")
625        logging.getLogger().removeHandler(file_handler)
626        os.remove(self._logfile_path)
627        return (ret != False), dest
628
629    def _Print(self, string):
630        """Prints a string and a new line character.
631
632        Args:
633            string: The string to be printed.
634        """
635        self._out_file.write(string + "\n")
636
637    def _PrintObjects(self, objects, attr_names):
638        """Shows objects as a table.
639
640        Args:
641            object: The objects to be shown, one object in a row.
642            attr_names: The attributes to be shown, one attribute in a column.
643        """
644        width = [len(name) for name in attr_names]
645        rows = [attr_names]
646        for dev_info in objects:
647            attrs = [
648                _ToPrintString(getattr(dev_info, name, ""))
649                for name in attr_names
650            ]
651            rows.append(attrs)
652            for index, attr in enumerate(attrs):
653                width[index] = max(width[index], len(attr))
654
655        for row in rows:
656            self._Print("  ".join(
657                attr.ljust(width[index]) for index, attr in enumerate(row)))
658
659    def DownloadTestResources(self, request_id):
660        """Download all of the test resources for a TFC request id.
661
662        Args:
663            request_id: int, TFC request id
664        """
665        resources = self._tfc_client.TestResourceList(request_id)
666        for resource in resources:
667            self.DownloadTestResource(resource['url'])
668
669    def DownloadTestResource(self, url):
670        """Download a test resource with build provider, given a url.
671
672        Args:
673            url: a resource locator (not necessarily HTTP[s])
674                with the scheme specifying the build provider.
675        """
676        parsed = urlparse.urlparse(url)
677        path = (parsed.netloc + parsed.path).split('/')
678        if parsed.scheme == "pab":
679            if len(path) != 5:
680                logging.error("Invalid pab resource locator: %s", url)
681                return
682            account_id, branch, target, build_id, artifact_name = path
683            cmd = ("fetch"
684                   " --type=pab"
685                   " --account_id=%s"
686                   " --branch=%s"
687                   " --target=%s"
688                   " --build_id=%s"
689                   " --artifact_name=%s") % (account_id, branch, target,
690                                             build_id, artifact_name)
691            self.onecmd(cmd)
692        elif parsed.scheme == "ab":
693            if len(path) != 4:
694                logging.error("Invalid ab resource locator: %s", url)
695                return
696            branch, target, build_id, artifact_name = path
697            cmd = ("fetch"
698                   "--type=ab"
699                   " --branch=%s"
700                   " --target=%s"
701                   " --build_id=%s"
702                   " --artifact_name=%s") % (branch, target, build_id,
703                                             artifact_name)
704            self.onecmd(cmd)
705        elif parsed.scheme == gcs:
706            cmd = "fetch --type=gcs --path=%s" % url
707            self.onecmd(cmd)
708        else:
709            logging.error("Invalid URL: %s", url)
710
711    def SetSerials(self, serials):
712        """Sets the default serial numbers for flashing and testing.
713
714        Args:
715            serials: A list of strings, the serial numbers.
716        """
717        self._serials = serials
718
719    def FlashImgPackage(self, package_path_gcs):
720        """Fetches a repackaged image set from GCS and flashes to the device(s).
721
722        Args:
723            package_path_gcs: GCS URL to the packaged img zip file. May contain
724                              the GSI imgs.
725        """
726        self.onecmd("fetch --type=gcs --path=%s --full_device_images=True" %
727                    package_path_gcs)
728        if common.FULL_ZIPFILE not in self.device_image_info:
729            logging.error("Failed to fetch the given file: %s",
730                          package_path_gcs)
731            return False
732
733        if not self._serials:
734            logging.error("Please specify the serial number(s) of target "
735                          "device(s) for flashing.")
736            return False
737
738        campaign_common = imp.load_source(
739            'campaign_common',
740            os.path.join(os.getcwd(), "host_controller", "campaigns",
741                         "campaign_common.py"))
742        flash_command_list = []
743
744        for serial in self._serials:
745            flash_commands = []
746            cmd_utils.ExecuteOneShellCommand(
747                "adb -s %s reboot bootloader" % serial)
748            _, stderr, retcode = cmd_utils.ExecuteOneShellCommand(
749                "fastboot -s %s getvar product" % serial)
750            if retcode == 0:
751                res = stderr.splitlines()[0].rstrip()
752                if ":" in res:
753                    product = res.split(":")[1].strip()
754                elif "waiting for %s" % serial in res:
755                    res = stderr.splitlines()[1].rstrip()
756                    product = res.split(":")[1].strip()
757                else:
758                    product = "error"
759            else:
760                product = "error"
761            logging.info("Device %s product type: %s", serial, product)
762            if product in campaign_common.FLASH_COMMAND_EMITTER:
763                flash_commands.append(
764                    campaign_common.FLASH_COMMAND_EMITTER[product](
765                        serial, repacked_imageset=True))
766            elif product != "error":
767                flash_commands.append(
768                    "flash --current --serial %s --skip-vbmeta=True" % serial)
769            else:
770                logging.error(
771                    "Device %s does not exist. Omitting the flashing "
772                    "to the device.", serial)
773                continue
774            flash_command_list.append(flash_commands)
775
776        ret = self.onecmd(flash_command_list)
777        if ret == False:
778            logging.error("Flash failed on device %s.", self._serials)
779        else:
780            logging.info("Flash succeeded on device %s.", self._serials)
781
782        return ret
783
784    def GetSerials(self):
785        """Returns the serial numbers saved in the console.
786
787        Returns:
788            A list of strings, the serial numbers.
789        """
790        return self._serials
791
792    def ResetSerials(self):
793        """Clears all the serial numbers set to this console obj."""
794        self._serials = []
795
796    def JobThread(self):
797        """Job thread which monitors and uploads results."""
798        thread = threading.currentThread()
799        while getattr(thread, "keep_running", True):
800            time.sleep(1)
801
802        if self._job_pool:
803            self._job_pool.close()
804            self._job_pool.terminate()
805            self._job_pool.join()
806
807    def StartJobThreadAndProcessPool(self):
808        """Starts a background thread to control leased jobs."""
809        self._job_in_queue = multiprocessing.Queue()
810        self._job_out_queue = multiprocessing.Queue()
811        self._job_pool = NonDaemonizedPool(
812            common._MAX_LEASED_JOBS, JobMain,
813            (self._vti_address, self._job_in_queue, self._job_out_queue,
814             self._device_status, self._password, self._hosts))
815
816        self._job_thread = threading.Thread(target=self.JobThread)
817        self._job_thread.daemon = True
818        self._job_thread.start()
819
820    def StopJobThreadAndProcessPool(self):
821        """Terminates the thread and processes that runs the leased job."""
822        if hasattr(self, "_job_thread"):
823            self._job_thread.keep_running = False
824            self._job_thread.join()
825
826    def WaitForJobsToExit(self):
827        """Wait for the running jobs to complete before exiting HC."""
828        if self._job_pool:
829            pool_process_count = common._MAX_LEASED_JOBS
830            for _ in range(common._MAX_LEASED_JOBS):
831                self._job_in_queue.put("exit")
832
833            while True:
834                response = self._job_out_queue.get()
835                if response == "exit":
836                    pool_process_count -= 1
837                if pool_process_count <= 0:
838                    break
839
840    # @Override
841    def onecmd(self, line, depth=1, ret_out_queue=None):
842        """Executes command(s) and prints any exception.
843
844        Parallel execution only for 2nd-level list element.
845
846        Args:
847            line: a list of string or string which keeps the command to run.
848        """
849        if not line:
850            return
851
852        if type(line) == list:
853            if depth == 1:  # 1 to use multi-threading
854                jobs = []
855                ret_queue = multiprocessing.Queue()
856                for sub_command in line:
857                    p = multiprocessing.Process(
858                        target=self.onecmd,
859                        args=(
860                            sub_command,
861                            depth + 1,
862                            ret_queue,
863                        ))
864                    jobs.append(p)
865                    p.start()
866                for job in jobs:
867                    job.join()
868
869                ret_cmd_list = True
870                while not ret_queue.empty():
871                    ret_from_subprocess = ret_queue.get()
872                    ret_cmd_list = ret_cmd_list and ret_from_subprocess
873                if ret_cmd_list == False:
874                    return False
875            else:
876                for sub_command in line:
877                    ret_cmd_list = self.onecmd(sub_command, depth + 1)
878                    if ret_cmd_list == False and ret_out_queue:
879                        ret_out_queue.put(False)
880                        return False
881            return
882
883        logging.info("Command: %s", line)
884        try:
885            ret_cmd = cmd.Cmd.onecmd(self, line)
886            if ret_cmd == False and ret_out_queue:
887                ret_out_queue.put(ret_cmd)
888            return ret_cmd
889        except Exception as e:
890            self._Print("%s: %s" % (type(e).__name__, e))
891            if ret_out_queue:
892                ret_out_queue.put(False)
893            return False
894
895    # @Override
896    def emptyline(self):
897        """Ignores empty lines."""
898        pass
899
900    # @Override
901    def default(self, line):
902        """Handles unrecognized commands.
903
904        Returns:
905            True if receives EOF; otherwise delegates to default handler.
906        """
907        if line == "EOF":
908            return self.do_exit(line)
909        return cmd.Cmd.default(self, line)
910
911
912def _ToPrintString(obj):
913    """Converts an object to printable string on console.
914
915    Args:
916        obj: The object to be printed.
917    """
918    if isinstance(obj, (list, tuple, set)):
919        return ",".join(str(x) for x in obj)
920    return str(obj)
921