# # Copyright (C) 2017 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import logging import socket import time import uuid import httplib2 from googleapiclient import errors from host_controller import invocation_thread from host_controller.tradefed import remote_operation from host_controller.tfc import command_attempt class HostController(object): """The class that relays commands between a TradeFed host and clusters. Attributes: _remote_client: The RemoteClient which runs commands. _tfc_client: The TfcClient from which the command tasks are leased. _hostname: A string, the name of the TradeFed host. _cluster_ids: A list of strings, the cluster IDs for leasing tasks. _invocation_threads: The list of running InvocationThread. """ def __init__(self, remote_client, tfc_client, hostname, cluster_ids): """Initializes the attributes.""" self._remote_client = remote_client self._tfc_client = tfc_client self._hostname = hostname self._cluster_ids = cluster_ids self._invocation_threads = [] @property def hostname(self): """Returns the name of the host.""" return self._hostname def _JoinInvocationThreads(self): """Removes terminated threads from _invocation_threads.""" alive_threads = [] for inv_thread in self._invocation_threads: inv_thread.join(0) if inv_thread.is_alive(): alive_threads.append(inv_thread) self._invocation_threads = alive_threads def _CreateInvocationThread(self, task): """Creates an invocation thread from a command task. Args: task: The CommandTask object. Returns: An InvocationThread. """ attempt_id = uuid.uuid4() attempt = command_attempt.CommandAttempt( task.task_id, attempt_id, self._hostname, task.device_serials[0]) inv_thread = invocation_thread.InvocationThread( self._remote_client, self._tfc_client, attempt, task.command_line.split(), task.device_serials) return inv_thread def ListDevices(self): """Lists present devices on the host. Returns: A list of DeviceInfo. """ devices = self._remote_client.ListDevices() return [dev for dev in devices if not dev.IsStub()] def ListAvailableDevices(self): """Lists available devices for command tasks. Returns: A list of DeviceInfo. """ self._JoinInvocationThreads() allocated_serials = set() for inv_thread in self._invocation_threads: allocated_serials.update(inv_thread.device_serials) present_devices = self.ListDevices() return [dev for dev in present_devices if dev.IsAvailable() and dev.device_serial not in allocated_serials] def LeaseCommandTasks(self): """Leases command tasks and creates threads to execute them. Returns: A list of CommandTask. The leased command tasks. """ available_devices = self.ListAvailableDevices() if not available_devices: return [] tasks = self._tfc_client.LeaseHostTasks( self._cluster_ids[0], self._cluster_ids[1:], self._hostname, available_devices) for task in tasks: inv_thread = self._CreateInvocationThread(task) inv_thread.daemon = True inv_thread.start() self._invocation_threads.append(inv_thread) return tasks def Run(self, poll_interval): """Starts polling TFC for tasks. Args: poll_interval: The poll interval in seconds. """ while True: try: self.LeaseCommandTasks() except (socket.error, remote_operation.RemoteOperationException, httplib2.HttpLib2Error, errors.HttpError) as e: logging.exception(e) time.sleep(poll_interval)